Changeset 2458
- Timestamp:
- 01/25/23 16:59:46 (16 months ago)
- Location:
- XIOS3/trunk
- Files:
-
- 35 edited
- 5 copied
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk
-
Property
svn:mergeinfo
set to
False
/XIOS3/dev/XIOS_FILE_SERVICES merged eligible
-
Property
svn:mergeinfo
set to
False
-
XIOS3/trunk/src/buffer_client.cpp
r2324 r2458 94 94 void CClientBuffer::attachWindows(vector<MPI_Win>& windows) 95 95 { 96 isAttachedWindows_=true ; 96 97 windows_=windows ; 97 98 if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ; … … 170 171 bool CClientBuffer::isBufferFree(StdSize size) 171 172 { 172 173 if (!isAttachedWindows_) return false; 174 173 175 lockBuffer(); 174 176 count=*bufferCount[current] ; -
XIOS3/trunk/src/buffer_client.hpp
r2260 r2458 34 34 void fixBuffer(void) { isGrowableBuffer_=false ;} 35 35 void attachWindows(vector<MPI_Win>& windows) ; 36 bool isAttachedWindows(void) { return isAttachedWindows_ ;} 36 37 private: 37 38 void resizeBuffer(size_t newSize) ; … … 69 70 const MPI_Comm interComm; 70 71 std::vector<MPI_Win> windows_ ; 71 bool hasWindows ;72 72 bool hasWindows=false ; 73 bool isAttachedWindows_=false ; 73 74 double latency_=0 ; 74 75 double lastCheckedWithNothing_=0 ; -
XIOS3/trunk/src/client.cpp
r2418 r2458 188 188 189 189 CXios::launchDaemonsManager(false) ; 190 poolRessource_ = new CPoolRessource(clientComm, codeId ) ;190 poolRessource_ = new CPoolRessource(clientComm, codeId, false) ; 191 191 192 192 ///////////////////////////////////////// 193 193 ///////////// PART 4 //////////////////// 194 194 ///////////////////////////////////////// 195 195 /* 196 MPI_Request req ; 197 MPI_Status status ; 198 MPI_Ibarrier(xiosGlobalComm,&req) ; // be sure that all services are created now, could be remove later if more asynchronisity 199 int ok=false ; 200 while (!ok) 201 { 202 CXios::getDaemonsManager()->eventLoop() ; 203 MPI_Test(&req,&ok,&status) ; 204 } 205 */ 196 206 returnComm = clientComm ; 197 207 } -
XIOS3/trunk/src/config/context_attribute.conf
r1761 r2458 1 1 DECLARE_ATTRIBUTE(StdString, output_dir) 2 2 DECLARE_ATTRIBUTE(bool, attached_mode) 3 DECLARE_ATTRIBUTE(StdString, default_pool) 4 DECLARE_ATTRIBUTE(StdString, default_pool_writer) 5 DECLARE_ATTRIBUTE(StdString, default_pool_reader) 6 DECLARE_ATTRIBUTE(StdString, default_pool_gatherer) 7 DECLARE_ATTRIBUTE(StdString, default_writer) 8 DECLARE_ATTRIBUTE(StdString, default_gatherer) 9 DECLARE_ATTRIBUTE(StdString, default_reader) 10 DECLARE_ATTRIBUTE(bool, default_using_server2) -
XIOS3/trunk/src/config/file_attribute.conf
r1493 r2458 38 38 DECLARE_ATTRIBUTE(StdString, uuid_name) 39 39 DECLARE_ATTRIBUTE(StdString, uuid_format) 40 41 DECLARE_ATTRIBUTE(StdString, pool_writer) 42 DECLARE_ATTRIBUTE(StdString, writer) 43 DECLARE_ATTRIBUTE(StdString, pool_gatherer) 44 DECLARE_ATTRIBUTE(StdString, gatherer) 45 DECLARE_ATTRIBUTE(StdString, pool_reader) 46 DECLARE_ATTRIBUTE(StdString, reader) 47 DECLARE_ATTRIBUTE(bool, using_server2) -
XIOS3/trunk/src/config/node_type.conf
r2408 r2458 128 128 #endif //__XIOS_CPoolNode__ 129 129 130 #ifdef __XIOS_CServiceNode__ 131 DECLARE_NODE(ServiceNode, node) 132 #endif //__XIOS_CServiceNode__ 133 130 134 #ifdef __XIOS_CContext__ 131 135 DECLARE_NODE_PAR(Context, context) -
XIOS3/trunk/src/config/pool_attribute.conf
r2408 r2458 1 1 DECLARE_ATTRIBUTE(StdString, name) 2 DECLARE_ATTRIBUTE(int, nprocs) 3 DECLARE_ATTRIBUTE(double, global_fraction) 4 DECLARE_ATTRIBUTE(double, remain_fraction) 5 DECLARE_ATTRIBUTE(bool, remain) -
XIOS3/trunk/src/group_factory_decl.cpp
r2408 r2458 49 49 macro(CExtractDomainGroup) 50 50 macro(CPoolNodeGroup) 51 macro(CServiceNodeGroup) 51 52 52 53 } -
XIOS3/trunk/src/group_template_decl.cpp
r2408 r2458 38 38 macro(ExtractDomain) 39 39 macro(PoolNode) 40 macro(ServiceNode) 40 41 41 42 } -
XIOS3/trunk/src/manager/pool_ressource.cpp
r2404 r2458 10 10 namespace xios 11 11 { 12 CPoolRessource::CPoolRessource(MPI_Comm poolComm, const std::string& Id ) : Id_(Id), finalizeSignal_(false)12 CPoolRessource::CPoolRessource(MPI_Comm poolComm, const std::string& Id, bool isServer) : Id_(Id), finalizeSignal_(false) 13 13 { 14 14 int commRank, commSize ; … … 23 23 int globalLeaderRank ; 24 24 MPI_Comm_rank(CXios::getXiosComm(),&globalLeaderRank) ; 25 CXios::getRessourcesManager()->registerPool(Id, commSize, globalLeaderRank) ;25 if (isServer) CXios::getRessourcesManager()->registerPoolServer(Id, commSize, globalLeaderRank) ; 26 26 } 27 27 -
XIOS3/trunk/src/manager/pool_ressource.hpp
r2404 r2458 27 27 28 28 public: 29 CPoolRessource(MPI_Comm poolComm, const std::string& Id ) ;29 CPoolRessource(MPI_Comm poolComm, const std::string& Id, bool isServer) ; 30 30 ~CPoolRessource() ; 31 31 -
XIOS3/trunk/src/manager/ressources_manager.cpp
r2260 r2458 2 2 #include "server.hpp" 3 3 #include "servers_ressource.hpp" 4 #include "token_manager.hpp" 4 5 #include "timer.hpp" 5 6 … … 21 22 if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 22 23 else commRank=0 ; 24 tokenManager_ = new CTokenManager(xiosComm_,commRank) ; 25 23 26 MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ; 24 27 … … 163 166 buffer.realloc(maxBufferSize_) ; 164 167 165 buffer<< serverLeader_ ;168 buffer<<ressourcesSize_<<freeRessourcesSize_<<serverLeader_ ; 166 169 buffer<<(int) pools_.size(); 167 170 for(auto it=pools_.begin();it!=pools_.end(); ++it) … … 169 172 auto key = it->first ; 170 173 auto val = it->second ; 171 buffer << key<<std::get<0>(val) << std::get<1>(val);174 buffer << key<<std::get<0>(val) << std::get<1>(val) << std::get<2>(val); 172 175 } 173 176 } … … 177 180 std::string poolId ; 178 181 int size ; 182 int freeSize ; 179 183 int leader ; 180 184 181 buffer>> serverLeader_ ;185 buffer>>ressourcesSize_>>freeRessourcesSize_>>serverLeader_ ; 182 186 pools_.clear() ; 183 187 int nbPools ; … … 185 189 for(int i=0;i<nbPools;i++) 186 190 { 187 buffer>>poolId>>size>> leader ;188 pools_[poolId]=std::make_tuple(size, leader) ;191 buffer>>poolId>>size>>freeSize>>leader ; 192 pools_[poolId]=std::make_tuple(size, freeSize, leader) ; 189 193 } 190 194 } … … 210 214 211 215 212 void CRessourcesManager::registerPool(const string& poolId, int size, int leader) 213 { 214 winRessources_->lockWindow(managerGlobalLeader_,0) ; 215 winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; 216 pools_[poolId] = make_tuple(size,leader) ; 216 void CRessourcesManager::registerPoolClient(const string& poolId, int size, int leader) 217 { 218 winRessources_->lockWindow(managerGlobalLeader_,0) ; 219 winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; 220 pools_[poolId] = make_tuple(size, size, leader) ; 221 winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ; 222 winRessources_->unlockWindow(managerGlobalLeader_,0) ; 223 } 224 225 void CRessourcesManager::registerPoolServer(const string& poolId, int size, int leader) 226 { 227 winRessources_->lockWindow(managerGlobalLeader_,0) ; 228 winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; 229 pools_[poolId] = make_tuple(size, size, leader) ; 217 230 freeRessourcesSize_-=size ; 218 231 winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ; … … 220 233 } 221 234 222 223 bool CRessourcesManager::getPoolInfo(const string& poolId, int& size, int& leader) 235 bool CRessourcesManager::getPoolInfo(const string& poolId, int& size, int& freeSize, int& leader) 224 236 { 225 237 winRessources_->lockWindow(managerGlobalLeader_,0) ; … … 232 244 { 233 245 size=get<0>(it->second) ; 234 leader=get<1>(it->second) ; 246 freeSize=get<1>(it->second) ; 247 leader=get<2>(it->second) ; 235 248 return true ; 236 249 } 250 } 251 252 bool CRessourcesManager::decreasePoolFreeSize(const string& poolId, int size) 253 { 254 bool ret ; 255 256 winRessources_->lockWindow(managerGlobalLeader_,0) ; 257 winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; 258 259 260 auto it=pools_.find(poolId) ; 261 262 if ( it == pools_.end()) ret=false ; 263 else 264 { 265 get<1>(it->second)-=size ; 266 ret=true ; 267 } 268 winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ; 269 winRessources_->unlockWindow(managerGlobalLeader_,0) ; 270 271 return ret ; 237 272 } 238 273 … … 257 292 bool CRessourcesManager::getPoolLeader(const string& poolId, int& leader) 258 293 { 259 int size ;260 return getPoolInfo(poolId, size, leader) ;294 int size, freeSize ; 295 return getPoolInfo(poolId, size, freeSize, leader) ; 261 296 } 262 297 263 298 bool CRessourcesManager::getPoolSize(const string& poolId, int& size) 264 299 { 265 int leader ; 266 return getPoolInfo(poolId, size, leader) ; 300 int leader,freeSize ; 301 return getPoolInfo(poolId, size, freeSize, leader) ; 302 } 303 304 bool CRessourcesManager::getPoolFreeSize(const string& poolId, int& freeSize) 305 { 306 int leader,size ; 307 return getPoolInfo(poolId, size, freeSize, leader) ; 267 308 } 268 309 269 310 bool CRessourcesManager::hasPool(const string& poolId) 270 311 { 271 int leader,size ; 272 return getPoolInfo(poolId, size, leader) ; 312 int leader,size,freeSize ; 313 return getPoolInfo(poolId, size, freeSize, leader) ; 314 } 315 316 void CRessourcesManager::waitPoolRegistration(const string& poolId) 317 { 318 while(!hasPool(poolId)) CXios::getDaemonsManager()->servicesEventLoop() ; 273 319 } 274 320 } -
XIOS3/trunk/src/manager/ressources_manager.hpp
r2260 r2458 11 11 #include "window_manager.hpp" 12 12 #include "pool_ressource.hpp" 13 #include "token_manager.hpp" 13 14 14 15 … … 45 46 int getRessourcesSize(void) ; 46 47 int getFreeRessourcesSize(void) ; 47 bool getPoolInfo(const string& poolId, int& size, int& leader) ;48 bool getPoolInfo(const string& poolId, int& size, int& freeSize, int& leader) ; 48 49 bool getPoolLeader(const string& poolId, int& leader) ; 49 50 bool getPoolSize(const string& poolId, int& size) ; 51 bool getPoolFreeSize(const string& poolId, int& freeSize) ; 50 52 bool hasPool(const string& poolId) ; 53 bool decreasePoolFreeSize(const string& poolId, int size) ; 54 void waitPoolRegistration(const string& poolId) ; 55 51 56 52 57 void registerServerLeader(int leaderRank) ; 53 58 void registerRessourcesSize(int size) ; 54 void registerPool(const std::string& poolId,int size,int leader) ; 59 void registerPoolClient(const std::string& poolId,int size,int leader) ; 60 void registerPoolServer(const std::string& poolId,int size,int leader) ; 61 CTokenManager* getTokenManager(void) {return tokenManager_ ;} 55 62 56 63 int managerGlobalLeader_ ; … … 59 66 60 67 CWindowManager* winNotify_ ; 68 CTokenManager* tokenManager_ ; 61 69 62 70 const size_t maxBufferSize_=1024*1024 ; … … 67 75 tuple<std::string, int> notifyCreatePool_ ; 68 76 69 std::map<std::string, std::tuple<int,int >> pools_ ;77 std::map<std::string, std::tuple<int,int,int>> pools_ ; 70 78 int serverLeader_ ; 71 79 int ressourcesSize_ ; -
XIOS3/trunk/src/manager/servers_ressource.cpp
r2274 r2458 50 50 bool isPartOf ; 51 51 52 for(int i=0 ; i<freeRessourcesRank_.size();i++)52 for(int i=0, j=0; i<freeRessourcesRank_.size();i++) 53 53 { 54 54 if (i<size) isPartOf=true ; … … 56 56 { 57 57 isPartOf=false ; 58 newFreeRessourcesRank[i]=freeRessourcesRank_[i] ; 58 newFreeRessourcesRank[j]=freeRessourcesRank_[i] ; 59 j++ ; 59 60 } 60 61 … … 165 166 if (isPartOf) 166 167 { 167 poolRessource_ = new CPoolRessource(poolComm, poolId ) ;168 poolRessource_ = new CPoolRessource(poolComm, poolId, true) ; 168 169 MPI_Comm_free(&poolComm) ; 169 170 } -
XIOS3/trunk/src/manager/services_manager.cpp
r2404 r2458 54 54 55 55 int leader ; 56 int poolSize ;56 int poolSize, poolFreeSize ; 57 57 58 58 info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ; 59 bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;59 bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ; 60 60 if (wait) 61 61 { … … 63 63 { 64 64 CXios::getDaemonsManager()->eventLoop() ; 65 ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;65 ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ; 66 66 } 67 67 } … … 70 70 { 71 71 info(40)<<"CServicesManager : create service notification to leader "<<leader<<", serviceId : "<<serviceId<<", size : "<<size<<endl ; 72 CXios::getRessourcesManager()->decreasePoolFreeSize(poolId ,size) ; 72 73 createServicesNotify(leader, serviceId, type, size, nbPartitions) ; 73 74 return true ; … … 81 82 int leader ; 82 83 int poolSize ; 84 int poolFreeSize ; 83 85 84 86 info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ; 85 bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;87 bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ; 86 88 if (wait) 87 89 { … … 89 91 { 90 92 CXios::getDaemonsManager()->eventLoop() ; 91 ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;93 ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ; 92 94 } 93 95 } … … 251 253 252 254 bool CServicesManager::getServiceInfo(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, 253 int& size, int& nbPartitions, int& leader )255 int& size, int& nbPartitions, int& leader, bool wait) 254 256 { 255 257 … … 259 261 260 262 auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ; 261 if ( it == services_.end() ) return false ;263 if ( it == services_.end() && !wait) return false ; 262 264 else 263 265 { 266 if (wait) waitServiceRegistration(poolId, serviceId, partitionId) ; 264 267 type= std::get<0>(it->second); 265 268 size= std::get<1>(it->second); … … 270 273 } 271 274 272 bool CServicesManager::getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader )275 bool CServicesManager::getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader, bool wait) 273 276 { 274 277 int type; … … 278 281 } 279 282 280 bool CServicesManager::getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type )283 bool CServicesManager::getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, bool wait) 281 284 { 282 285 int size ; … … 286 289 } 287 290 288 bool CServicesManager::getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartitions )291 bool CServicesManager::getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartitions, bool wait) 289 292 { 290 293 int size ; … … 303 306 else return true ; 304 307 } 308 309 void CServicesManager::waitServiceRegistration(const std::string& poolId, const std::string& serviceId, const int& partitionId) 310 { 311 while(!hasService(poolId,serviceId,partitionId)) CXios::getDaemonsManager()->servicesEventLoop() ; 312 } 305 313 306 314 } -
XIOS3/trunk/src/manager/services_manager.hpp
r2404 r2458 43 43 44 44 void registerService(const std::string& poolId, const std::string& serviceId, const int& partitionId, int type, int size, int nbPartitions, int leader) ; 45 bool getServiceInfo(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, int& size, int& nbPartition, int& leader ) ;46 bool getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader ) ;47 bool getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type ) ;48 bool getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartition ) ;45 bool getServiceInfo(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, int& size, int& nbPartition, int& leader, bool wait=false) ; 46 bool getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader, bool wait=false) ; 47 bool getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, bool wait=false) ; 48 bool getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartition, bool wait=false) ; 49 49 bool hasService(const std::string& poolId, const std::string& serviceId, const int& partitionId) ; 50 void waitServiceRegistration(const std::string& poolId, const std::string& serviceId, const int& partitionId); 50 51 void servicesDumpOut(CBufferOut& buffer) ; 51 52 void servicesDumpIn(CBufferIn& buffer) ; 53 int getRessourcesSize(const std::string& poolId) ; 54 int getFreeRessourcesSize(const std::string& poolId) ; 52 55 53 56 private: -
XIOS3/trunk/src/node/context.cpp
r2441 r2458 577 577 if (commRank==0) 578 578 { 579 CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions ) ;579 CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions, true) ; 580 580 for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ; 581 581 } … … 588 588 parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 589 589 int type ; 590 if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type ) ;590 if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type, true) ; 591 591 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 592 592 string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ; … … 610 610 CATCH_DUMP_ATTR 611 611 612 613 // obsolete 612 614 void CContext::createServerInterComm(void) 613 615 TRY … … 645 647 } 646 648 CATCH_DUMP_ATTR 649 650 651 void CContext::getServerInterComm(const string& poolId, const string& serviceId, vector<pair<CContextClient*,CContextServer*>>& clientServers) 652 { 653 vector<pair<string, pair<CContextClient*,CContextServer*>>> retClientServers ; 654 655 auto it=serversMap_.find(make_pair(poolId,serviceId)) ; 656 if (it!=serversMap_.end()) clientServers=it->second ; 657 else 658 { 659 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+serviceId, retClientServers) ; 660 else createServerInterComm(poolId, serviceId, retClientServers) ; 661 for(auto& retClientServer : retClientServers) clientServers.push_back(retClientServer.second) ; 662 663 int serviceType ; 664 if (intraCommRank_==0) CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType) ; 665 MPI_Bcast(&serviceType,1,MPI_INT,0,intraComm_) ; 666 667 for(auto& clientServer : clientServers) 668 { 669 if (serviceType==CServicesManager::WRITER) { writerClientOut_.push_back(clientServer.first) ; writerServerOut_.push_back(clientServer.second) ; } 670 else if (serviceType==CServicesManager::READER) { readerClientOut_.push_back(clientServer.first) ; readerServerOut_.push_back(clientServer.second) ; } 671 else if (serviceType==CServicesManager::GATHERER) { writerClientOut_.push_back(clientServer.first) ; writerServerOut_.push_back(clientServer.second) ; } 672 } 673 serversMap_.insert(make_pair(make_pair(poolId,serviceId),clientServers)) ; 674 } 675 676 } 677 678 vector<CContextClient*> CContext::getContextClient(const string& poolId, const string& serviceId) 679 { 680 vector<pair<CContextClient*,CContextServer*>> clientServers ; 681 getServerInterComm(poolId, serviceId, clientServers ) ; 682 vector<CContextClient*> ret ; 683 for(auto& clientServer : clientServers) ret.push_back(clientServer.first) ; 684 return ret ; 685 } 686 647 687 648 688 void CContext::globalEventLoop(void) … … 698 738 for(auto couplerOut : couplerOutClient_) couplerOut.second->eventLoop(); 699 739 for(auto couplerIn : couplerInClient_) couplerIn.second->eventLoop(); 700 for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 701 for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 740 //for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 741 //for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 742 for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(); 743 for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(); 702 744 } 703 745 setCurrent(getId()) ; … … 783 825 CContextServer* server ; 784 826 827 /* 785 828 if (writerClientOut_.size()!=0) 786 829 { … … 803 846 info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 804 847 } 848 */ 849 850 for(int n=0; n<writerClientOut_.size() ; n++) 851 { 852 client=writerClientOut_[n] ; 853 server=writerServerOut_[n] ; 854 855 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to writer"<<endl ; 856 client->finalize(); 857 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to writer"<<endl ; 858 bool bufferReleased; 859 do 860 { 861 client->eventLoop(); 862 bufferReleased = !client->havePendingRequests(); 863 } while (!bufferReleased); 864 info(100)<<"DEBUG: context "<<getId()<<" no pending request on writer ok"<<endl ; 865 866 bool notifiedFinalized=false ; 867 do 868 { 869 notifiedFinalized=client->isNotifiedFinalized() ; 870 } while (!notifiedFinalized) ; 871 server->releaseBuffers(); 872 client->releaseBuffers(); 873 info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 874 } 875 805 876 806 877 if (readerClientOut_.size()!=0) … … 879 950 880 951 952 void CContext::setDefaultServices(void) 953 { 954 defaultPoolWriterId_ = CXios::defaultPoolId ; 955 defaultPoolReaderId_ = CXios::defaultPoolId ; 956 defaultPoolGathererId_ = CXios::defaultPoolId ; 957 defaultWriterId_ = CXios::defaultWriterId ; 958 defaultReaderId_ = CXios::defaultReaderId ; 959 defaultGathererId_ = CXios::defaultGathererId ; 960 defaultUsingServer2_ = CXios::usingServer2 ; 961 962 if (!default_pool.isEmpty()) defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 963 if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 964 if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 965 if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 966 if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 967 if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 968 if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 969 if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 970 } 971 881 972 /*! 882 973 \brief Close all the context defintion and do processing data … … 894 985 895 986 CTimer::get("Context : close definition").resume() ; 896 987 897 988 // create intercommunicator with servers. 898 989 // not sure it is the good place to be called here 899 createServerInterComm() ;990 //createServerInterComm() ; 900 991 901 992 … … 916 1007 // pour chacun des contextes. 917 1008 solveDescInheritance(true); 918 1009 setDefaultServices() ; 919 1010 // Check if some axis, domains or grids are eligible to for compressed indexed output. 920 1011 // Warning: This must be done after solving the inheritance and before the rest of post-processing … … 1023 1114 1024 1115 // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields 1025 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles); 1026 //else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 1116 1117 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles) ; 1118 /* 1119 if (serviceType_==CServicesManager::CLIENT ) 1120 { 1121 if (CXios::usingServer2) distributeFiles(this->enabledWriteModeFiles, defaultPoolGathererId_, defaultGathererId_); 1122 else distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 1123 } 1124 if (serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 1125 */ 1027 1126 1028 1127 // client side, assign context for file reading 1029 if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 1030 1128 // if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 1129 if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) 1130 { 1131 string poolReaderId ; 1132 string readerId ; 1133 file->getReaderServicesId(defaultPoolReaderId_, defaultReaderId_, poolReaderId, readerId) ; 1134 file->setContextClient(poolReaderId, readerId, 0) ; 1135 } 1136 1031 1137 // server side, assign context where to send file data read 1032 1138 if (serviceType_==CServicesManager::READER) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientIn_[0]) ; … … 1163 1269 // fix size for each context client 1164 1270 for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ; 1271 1165 1272 1166 1273 CTimer::get("Context : close definition").suspend() ; … … 1436 1543 TRY 1437 1544 { 1545 map< pair<string,string>, vector<CFile*>> fileMaps ; 1546 for(auto& file : files) 1547 { 1548 string poolWriterId ; 1549 string poolGathererId ; 1550 string writerId ; 1551 string gathererId ; 1552 bool usingServer2 ; 1553 1554 file->getWriterServicesId(defaultUsingServer2_, defaultPoolWriterId_, defaultWriterId_, defaultPoolGathererId_, defaultGathererId_, 1555 usingServer2, poolWriterId, writerId, poolGathererId, gathererId) ; 1556 if (serviceType_==CServicesManager::CLIENT && usingServer2) fileMaps[make_pair(poolGathererId,gathererId)].push_back(file) ; 1557 else fileMaps[make_pair(poolWriterId,writerId)].push_back(file) ; 1558 } 1559 for(auto& it : fileMaps) distributeFilesOnSameService(it.second, it.first.first, it.first.second) ; 1560 } 1561 CATCH_DUMP_ATTR 1562 1563 1564 void CContext::distributeFilesOnSameService(const vector<CFile*>& files, const string& poolId, const string& serviceId) 1565 TRY 1566 { 1438 1567 bool distFileMemory=false ; 1439 1568 distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory); 1440 1569 1441 int nbPools = writerClientOut_.size(); 1442 if (nbPools==1) distributeFileOverOne(files) ; 1443 else if (distFileMemory) distributeFileOverMemoryBandwith(files) ; 1444 else distributeFileOverBandwith(files) ; 1445 } 1446 CATCH_DUMP_ATTR 1447 1448 void CContext::distributeFileOverOne(const vector<CFile*>& files) 1449 TRY 1450 { 1451 for(auto& file : files) file->setContextClient(writerClientOut_[0]) ; 1452 } 1453 CATCH_DUMP_ATTR 1454 1455 void CContext::distributeFileOverBandwith(const vector<CFile*>& files) 1570 auto writers = getContextClient(poolId, serviceId) ; 1571 int nbPools = writers.size() ; 1572 1573 if (nbPools==1) distributeFileOverOne(files, poolId, serviceId) ; 1574 else if (distFileMemory) distributeFileOverMemoryBandwith(files, poolId, serviceId) ; 1575 else distributeFileOverBandwith(files, poolId, serviceId) ; 1576 } 1577 CATCH_DUMP_ATTR 1578 1579 void CContext::distributeFileOverOne(const vector<CFile*>& files, const string& poolId, const string& serviceId) 1580 TRY 1581 { 1582 for(auto& file : files) file->setContextClient(poolId, serviceId,0) ; 1583 } 1584 CATCH_DUMP_ATTR 1585 1586 void CContext::distributeFileOverBandwith(const vector<CFile*>& files, const string& poolId, const string& serviceId) 1456 1587 TRY 1457 1588 { … … 1459 1590 1460 1591 std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 1461 int nbPools = writerClientOut_.size(); 1592 auto writers = getContextClient(poolId, serviceId) ; 1593 int nbPools = writers.size(); 1594 //int nbPools = writerClientOut_.size(); 1462 1595 1463 1596 // (1) Find all enabled files in write mode … … 1512 1645 dataSize=(*poolDataSize.begin()).first ; 1513 1646 j=(*poolDataSize.begin()).second ; 1514 dataSizeMap[i].second->setContextClient( writerClientOut_[j]);1647 dataSizeMap[i].second->setContextClient(poolId, serviceId, j); 1515 1648 dataSize+=dataSizeMap[i].first; 1516 1649 poolDataSize.erase(poolDataSize.begin()) ; … … 1522 1655 CATCH_DUMP_ATTR 1523 1656 1524 void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList) 1525 TRY 1526 { 1527 int nbPools = writerClientOut_.size(); 1657 void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList, const string& poolId, const string& serviceId) 1658 TRY 1659 { 1660 auto writers = getContextClient(poolId, serviceId) ; 1661 int nbPools = writers.size(); 1662 1528 1663 double ratio=0.5 ; 1529 1664 ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); … … 1591 1726 } 1592 1727 } 1593 filesList[i]->setContextClient( writerClientOut_[files[i].assignedServer_]) ;1728 filesList[i]->setContextClient(poolId, serviceId, files[i].assignedServer_) ; 1594 1729 delete [] files[i].assignedGrid_ ; 1595 1730 } -
XIOS3/trunk/src/node/context.hpp
r2407 r2458 103 103 104 104 void initServer(MPI_Comm intraComm, int serviceType ); 105 void createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer) ; 106 107 void createServerInterComm(void) ; 108 void createServerInterComm_old(void) ; 109 void createServerInterComm(const string& poolId, const string& serverId, vector<pair<string, pair<CContextClient*,CContextServer*>>>& clientServers ) ; 110 111 105 112 106 bool isInitialized(void); 113 107 … … 123 117 124 118 bool isFinalized(void); 125 126 119 void closeDefinition(void); 127 120 … … 159 152 160 153 // Distribute files (in write mode) among secondary-server pools according to the estimated data flux 161 void distributeFiles(const std::vector<CFile*>& files); 162 void distributeFileOverOne(const vector<CFile*>& files) ; //!< Distribute files over one single server (no distribution) 163 void distributeFileOverBandwith(const std::vector<CFile*>& files) ; //!< Distribute files overs servers to balance the I/O bandwith 164 void distributeFileOverMemoryBandwith(const std::vector<CFile*>& files) ; //!< Distribute files overs servers to minimize the memory consumption 154 void distributeFiles(const vector<CFile*>& files) ; 155 void distributeFilesOnSameService(const vector<CFile*>& files, const string& poolId, const string& serviceId) ; 156 void distributeFileOverOne(const vector<CFile*>& files, const string& poolId, const string& serviceId) ; //!< Distribute files over one single server (no distribution) 157 void distributeFileOverBandwith(const std::vector<CFile*>& files, const string& poolId, const string& serviceId) ; //!< Distribute files overs servers to balance the I/O bandwith 158 void distributeFileOverMemoryBandwith(const std::vector<CFile*>& files, const string& poolId, const string& serviceId) ; //!< Distribute files overs servers to minimize the memory consumption 165 159 166 160 public: … … 271 265 bool setProcessingEvent(void) {isProcessingEvent_=true ;} 272 266 bool unsetProcessingEvent(void) {isProcessingEvent_=false ;} 273 MPI_Comm getIntraComm(void) {return intraComm_ ;} 274 int getIntraCommRank(void) {return intraCommRank_;} 275 int getIntraCommSize(void) {return intraCommSize_;} 276 267 277 268 void addCouplingChanel(const std::string& contextId, bool out) ; 278 269 … … 310 301 311 302 private: 312 std::string defaultReaderId ; 313 std::string defaultWriterId ; 314 std::string defaultGathererId ; 303 std::string defaultPoolWriterId_ ; 304 std::string defaultPoolReaderId_ ; 305 std::string defaultPoolGathererId_ ; 306 std::string defaultWriterId_ ; 307 std::string defaultReaderId_ ; 308 std::string defaultGathererId_ ; 309 bool defaultUsingServer2_ ; 310 void setDefaultServices(void) ; 311 312 313 std::map<std::pair<string,string>,std::vector<pair<CContextClient*,CContextServer*>>> serversMap_ ; 315 314 316 315 std::vector<CContextClient*> writerClientOut_ ; … … 342 341 std::map<std::string, CContextServer*> couplerInServer_ ; 343 342 public: 343 void createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer) ; 344 void createServerInterComm(void) ; // obsolete 345 void createServerInterComm_old(void) ; 346 void createServerInterComm(const string& poolId, const string& serverId, vector<pair<string, pair<CContextClient*,CContextServer*>>>& clientServers ) ; 347 void getServerInterComm(const string& poolId, const string& serviceId, vector<pair<CContextClient*,CContextServer*>>& clientServers) ; 348 vector<CContextClient*> getContextClient(const string& poolId, const string& serviceId) ; 344 349 CContextClient* getCouplerInClient(const string& contextId) { return couplerInClient_[contextId] ;} 345 350 CContextServer* getCouplerInServer(const string& contextId) { return couplerInServer_[contextId] ;} 346 351 CContextClient* getCouplerOutClient(const string& contextId) { return couplerOutClient_[contextId] ;} 347 352 CContextServer* getCouplerOutServer(const string& contextId) { return couplerOutServer_[contextId] ;} 348 349 CRegistry* registryIn=nullptr ; //!< input registry which is read from file 350 CRegistry* registryOut=nullptr ; //!< output registry which will be written into file at the finalize 353 354 public: // must be privatize using accessors 355 356 CRegistry* registryIn=nullptr ; //!< input registry which is read from file 357 CRegistry* registryOut=nullptr ; //!< output registry which will be written into file at the finalize 351 358 352 359 … … 354 361 int intraCommRank_ ; //! context intra communicator rank 355 362 int intraCommSize_ ; //! context intra communicator size 356 363 public: 364 MPI_Comm getIntraComm(void) {return intraComm_ ;} 365 int getIntraCommRank(void) {return intraCommRank_;} 366 int getIntraCommSize(void) {return intraCommSize_;} 357 367 private: 358 368 shared_ptr<CEventScheduler> eventScheduler_ ; //! The local event scheduler for context -
XIOS3/trunk/src/node/file.cpp
r2409 r2458 26 26 : CObjectTemplate<CFile>(), CFileAttributes() 27 27 , vFieldGroup(), data_out(), enabledFields(), fileComm(MPI_COMM_NULL) 28 , isOpen(false), read_client(0),checkRead(false), allZoneEmpty(false)28 , isOpen(false), checkRead(false), allZoneEmpty(false) 29 29 { 30 30 setVirtualFieldGroup(CFieldGroup::create(getId() + "_virtual_field_group")); … … 35 35 : CObjectTemplate<CFile>(id), CFileAttributes() 36 36 , vFieldGroup(), data_out(), enabledFields(), fileComm(MPI_COMM_NULL) 37 , isOpen(false), read_client(0),checkRead(false), allZoneEmpty(false)37 , isOpen(false), checkRead(false), allZoneEmpty(false) 38 38 { 39 39 setVirtualFieldGroup(CFieldGroup::create(getId() + "_virtual_field_group")); … … 1117 1117 CATCH_DUMP_ATTR 1118 1118 1119 void CFile::getWriterServicesId(bool defaultUsingServer2_, const string& defaultPoolWriterId_, const string& defaultWriterId_, const string& defaultPoolGathererId_, const string& defaultGathererId_, 1120 bool& usingServer2, string& poolWriterId, string& writerId, string& poolGathererId, string& gathererId) 1121 { 1122 usingServer2 = defaultUsingServer2_ ; 1123 poolWriterId = defaultPoolWriterId_ ; 1124 writerId = defaultWriterId_ ; 1125 poolGathererId = defaultPoolGathererId_ ; 1126 gathererId = defaultGathererId_ ; 1127 1128 if (!using_server2.isEmpty()) usingServer2 = using_server2; 1129 if (!pool_writer.isEmpty()) poolWriterId = pool_writer ; 1130 if (!writer.isEmpty()) writerId = writer; 1131 if (!pool_gatherer.isEmpty()) poolGathererId = pool_gatherer; 1132 if (!gatherer.isEmpty()) gathererId = gatherer; 1133 } 1134 1135 void CFile::getReaderServicesId(const string& defaultPoolReaderId_, const string& defaultReaderId_, string& poolReaderId, string& readerId) 1136 { 1137 poolReaderId = defaultPoolReaderId_ ; 1138 readerId = defaultReaderId_ ; 1139 1140 if (!pool_reader.isEmpty()) poolReaderId = pool_reader ; 1141 if (!reader.isEmpty()) readerId = reader; 1142 } 1143 1144 void CFile::setContextClient(const string& defaultPoolId, const string& defaultServiceId, int partitionId) 1145 TRY 1146 { 1147 CContext* context = CContext::getCurrent(); 1148 vector<CContextClient*> clients = context->getContextClient(defaultPoolId, defaultServiceId) ; 1149 setContextClient(clients[partitionId]) ; 1150 } 1151 CATCH_DUMP_ATTR 1152 1153 1119 1154 void CFile::setContextClient(CContextClient* newContextClient) 1120 1155 TRY … … 1136 1171 CATCH_DUMP_ATTR 1137 1172 1138 void CFile::setReadContextClient(CContextClient* readContextclient) 1139 TRY 1140 { 1141 read_client = readContextclient; 1142 } 1143 CATCH_DUMP_ATTR 1144 1145 CContextClient* CFile::getReadContextClient() 1146 TRY 1147 { 1148 return read_client; 1149 } 1150 CATCH_DUMP_ATTR 1151 1173 1152 1174 /*! 1153 1175 \brief Send a message to create a field on server side -
XIOS3/trunk/src/node/file.hpp
r2326 r2458 128 128 CVariable* addVariable(const string& id = ""); 129 129 CVariableGroup* addVariableGroup(const string& id = ""); 130 131 132 void getWriterServicesId(bool defaultUsingServer2_, const string& defaultPoolWriterId_, const string& defaultWriterId_, const string& defaultPoolGathererId_, const string& defaultGathererId_, 133 bool& usingServer2, string& poolWriterId, string& writerId, string& poolGathererId, string& gathererId) ; 134 void getReaderServicesId(const string& defaultPoolReaderId_, const string& defaultReaderId_, string& poolReaderId, string& readerId) ; 135 136 137 void setContextClient(const string& defaultPoolId, const string& defaultServiceId, int partitionId) ; 130 138 void setContextClient(CContextClient* newContextClient); 131 139 CContextClient* getContextClient(); 132 133 void setReadContextClient(CContextClient* newContextClient);134 CContextClient* getReadContextClient();135 140 136 141 // Send info to server … … 192 197 /// Propriétés privées /// 193 198 CContextClient* client; 194 CContextClient* read_client; // Context client for reading (channel between server 1 and client)195 199 CFieldGroup* vFieldGroup; 196 200 CVariableGroup* vVariableGroup; -
XIOS3/trunk/src/node/node_enum.hpp
r2408 r2458 44 44 eExtractDomain, 45 45 ePoolNode,gPoolNode, 46 eServiceNode,gServiceNode, 46 47 // eService, gService 47 48 } ENodeType; -
XIOS3/trunk/src/node/node_type.hpp
r2408 r2458 34 34 #include "extract_domain.hpp" 35 35 #include "pool_node.hpp" 36 #include "service_node.hpp" 36 37 37 38 -
XIOS3/trunk/src/node/pool_node.cpp
r2408 r2458 1 1 #include "pool_node.hpp" 2 #include "cxios.hpp" 3 #include<cmath> 2 4 3 5 namespace xios … … 5 7 6 8 CPoolNode::CPoolNode(void) : CObjectTemplate<CPoolNode>(), CPoolNodeAttributes() 7 { /* Ne rien faire de plus */ } 9 { 10 setVirtualServiceNodeGroup(CServiceNodeGroup::create(getId() + "_virtual_service_node_group")); 11 } 8 12 9 13 CPoolNode::CPoolNode(const StdString & id) : CObjectTemplate<CPoolNode>(id), CPoolNodeAttributes() 10 { /* Ne rien faire de plus */ } 14 { 15 setVirtualServiceNodeGroup(CServiceNodeGroup::create(getId() + "_virtual_service_node_group")); 16 } 11 17 12 18 CPoolNode::~CPoolNode(void) … … 17 23 { 18 24 SuperClass::parse(node); 25 if (node.goToChildElement()) 26 { 27 do 28 { 29 if (node.getElementName()=="service" || node.getElementName()=="service_group") this->getVirtualServiceNodeGroup()->parseChild(node); 30 } while (node.goToNextElement()); 31 node.goToParentElement(); 32 } 19 33 } 20 34 21 35 void CPoolNode::allocateRessources(void) 36 { 37 int nonEmpty=0 ; 38 if (!nprocs.isEmpty()) nonEmpty++ ; 39 if (!global_fraction.isEmpty()) nonEmpty++ ; 40 if (!remain_fraction.isEmpty()) nonEmpty++ ; 41 if (!remain.isEmpty()) nonEmpty++ ; 42 if (nonEmpty==0) ERROR("void CPoolNode::allocateRessources(void)",<<"A number a ressource for allocate pool must be specified." 43 <<"At least attributes, <nprocs> or <global_fraction> or <remain_fraction> or <remain> must be specified") 44 else if (nonEmpty>1) ERROR("void CPoolNode::allocateRessources(void)",<<"Only one of these attributes : <nprocs> or <global_fraction>" 45 <<" or <remain_fraction> or <remain> must be specified to determine allocated ressouces." 46 <<" More than one is currently specified") 47 auto ressourcesManager=CXios::getRessourcesManager() ; 48 int nbRessources ; 49 int globalRessources = ressourcesManager->getRessourcesSize() ; 50 int freeRessources = ressourcesManager->getFreeRessourcesSize() ; 51 if (!nprocs.isEmpty()) nbRessources = nprocs ; 52 if (!global_fraction.isEmpty()) nbRessources = std::round(globalRessources * global_fraction) ; 53 if (!remain_fraction.isEmpty()) nbRessources = std::round(freeRessources * remain_fraction) ; 54 if (!remain.isEmpty()) nbRessources = freeRessources ; 55 if (nbRessources>freeRessources) 56 ERROR("void CPoolNode::allocateRessources(void)",<<"Cannot allocate required ressources for the pool." 57 <<" Required is : "<<nbRessources<<" but free ressource is currently : "<<freeRessources) 58 string poolId ; 59 if (!name.isEmpty()) poolId=name ; 60 else if (!hasAutoGeneratedId() ) poolId=getId() ; 61 else ERROR("void CPoolNode::allocateRessources(void)",<<"Pool has no name or id, attributes <id> or <name> must be specified") 62 ressourcesManager->createPool(poolId, nbRessources) ; 63 ressourcesManager->waitPoolRegistration(poolId) ; 64 auto services=this->getAllServiceNodes() ; 65 for(auto& service : services) service->allocateRessources(poolId) ; 66 } 22 67 } 23 68 -
XIOS3/trunk/src/node/pool_node.hpp
r2408 r2458 10 10 #include "group_factory.hpp" 11 11 #include "declare_group.hpp" 12 #include "service_node.hpp" 12 13 13 14 … … 58 59 public: 59 60 virtual void parse(xml::CXMLNode & node); 61 void allocateRessources(void) ; 62 63 private: 64 std::vector<CServiceNode*> getAllServiceNodes(void) const {return this->vServiceNodeGroup->getAllChildren();} 65 CServiceNodeGroup* getVirtualServiceNodeGroup(void) const {return this->vServiceNodeGroup; } 66 void setVirtualServiceNodeGroup(CServiceNodeGroup* newVServiceNodeGroup) { this->vServiceNodeGroup = newVServiceNodeGroup; } 67 CServiceNodeGroup* vServiceNodeGroup; 60 68 61 69 }; // class CPoolNode -
XIOS3/trunk/src/object_factory_decl2.cpp
r2408 r2458 22 22 macro(CExtractDomain) 23 23 macro(CPoolNode) 24 macro(CServiceNode) 24 25 } 25 26 -
XIOS3/trunk/src/object_factory_decl3.cpp
r2408 r2458 19 19 macro(CZoomDomainGroup) 20 20 macro(CPoolNodeGroup) 21 macro(CServiceNodeGroup) 21 22 } 22 23 -
XIOS3/trunk/src/object_template_decl.cpp
r2408 r2458 37 37 template class CObjectTemplate<CExtractDomain>; 38 38 template class CObjectTemplate<CPoolNode>; 39 template class CObjectTemplate<CServiceNode>; 39 40 40 41 template class CObjectTemplate<CContextGroup>; … … 69 70 template class CObjectTemplate<CExtractDomainGroup>; 70 71 template class CObjectTemplate<CPoolNodeGroup>; 72 template class CObjectTemplate<CServiceNodeGroup>; 71 73 72 74 } -
XIOS3/trunk/src/server.cpp
r2437 r2458 21 21 #include "servers_ressource.hpp" 22 22 #include "services.hpp" 23 #include "pool_node.hpp" 23 24 #include <cstdio> 24 25 #include "workflow_graph.hpp" … … 188 189 if (serversRessource->isServerLeader()) 189 190 { 190 int nbRessources = ressourcesManager->getRessourcesSize() ; 191 if (!CXios::usingServer2) 191 // creating pool 192 CPoolNodeGroup::get("xios","pool_definition")->solveDescInheritance(true) ; 193 vector<CPoolNode*> pools = CPoolNodeGroup::get("xios","pool_definition")->getAllChildren(); 194 for(auto& pool : pools) pool->allocateRessources() ; 195 196 int nbRessources = ressourcesManager->getFreeRessourcesSize() ; 197 if (nbRessources>0) 192 198 { 193 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 194 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ; 195 servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ; 196 } 197 else 198 { 199 int nprocsServer = nbRessources*CXios::ratioServer2/100.; 200 int nprocsGatherer = nbRessources - nprocsServer ; 199 if (!CXios::usingServer2) 200 { 201 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 202 ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 203 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ; 204 servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ; 205 } 206 else 207 { 208 int nprocsServer = nbRessources*CXios::ratioServer2/100.; 209 int nprocsGatherer = nbRessources - nprocsServer ; 201 210 202 int nbPoolsServer2 = CXios::nbPoolsServer2 ;203 if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;204 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;205 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;206 servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ;207 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ;208 209 211 int nbPoolsServer2 = CXios::nbPoolsServer2 ; 212 if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; 213 ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 214 ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 215 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 216 servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ; 217 servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ; 218 } 210 219 } 211 220 // servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ; 212 221 } 222 /* 223 MPI_Request req ; 224 MPI_Status status ; 225 MPI_Ibarrier(xiosGlobalComm,&req) ; // be sure that all services are created now, could be remove later if more asynchronisity 226 int ok=false ; 227 while (!ok) 228 { 229 daemonsManager->eventLoop() ; 230 MPI_Test(&req,&ok,&status) ; 231 } 232 */ 213 233 CTimer::get("XIOS initialize").suspend() ; 214 234 -
XIOS3/trunk/src/transport/legacy_context_client.cpp
r2343 r2458 13 13 #include "server.hpp" 14 14 #include "services.hpp" 15 #include "ressources_manager.hpp" 15 16 #include <boost/functional/hash.hpp> 16 17 #include <random> … … 110 111 } 111 112 113 MPI_Request req ; 114 MPI_Status status ; 115 MPI_Ibarrier(intraComm,&req) ; 116 int flag ; 117 MPI_Test(&req,&flag,&status) ; 118 while(!flag) 119 { 120 callGlobalEventLoop() ; 121 MPI_Test(&req,&flag,&status) ; 122 } 123 124 112 125 timeLine++; 113 126 } … … 126 139 * \return whether the already allocated buffers could be used 127 140 */ 128 bool CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 129 bool nonBlocking /*= false*/) 141 void CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers) 130 142 { 131 143 list<int>::const_iterator itServer, itSize; … … 134 146 list<CClientBuffer*>::iterator itBuffer; 135 147 bool areBuffersFree; 136 148 137 149 for (itServer = serverList.begin(); itServer != serverList.end(); itServer++) 138 150 { … … 140 152 if (it == buffers.end()) 141 153 { 154 CTokenManager* tokenManager = CXios::getRessourcesManager()->getTokenManager() ; 155 size_t token = tokenManager->getToken() ; 156 while (!tokenManager->lockToken(token)) callGlobalEventLoop() ; 142 157 newBuffer(*itServer); 143 158 it = buffers.find(*itServer); 159 checkAttachWindows(it->second,it->first) ; 160 tokenManager->unlockToken(token) ; 144 161 } 145 162 bufferList.push_back(it->second); … … 177 194 } 178 195 179 } while (!areBuffersFree && !nonBlocking);196 } while (!areBuffersFree); 180 197 CTimer::get("Blocking time").suspend(); 181 198 182 if (areBuffersFree) 183 { 184 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 185 retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 186 } 187 return areBuffersFree; 199 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 200 retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 188 201 } 189 202 … … 225 238 bufOut->put(sendBuff, 4); 226 239 buffer->checkBuffer(true); 227 240 /* 228 241 // create windows dynamically for one-sided 229 242 if (!isAttachedModeEnabled()) … … 254 267 buffer->attachWindows(windows_[rank]) ; 255 268 if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ; 256 257 } 258 269 */ 270 } 271 272 void CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank) 273 { 274 if (!buffer->isAttachedWindows()) 275 { 276 // create windows dynamically for one-sided 277 if (!isAttachedModeEnabled()) 278 { 279 CTimer::get("create Windows").resume() ; 280 MPI_Comm interComm ; 281 MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ; 282 MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 283 CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 284 MPI_Comm_free(&interComm) ; 285 windows_[rank].resize(2) ; 286 287 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 288 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 289 290 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 291 CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 292 293 CTimer::get("create Windows").suspend() ; 294 buffer->attachWindows(windows_[rank]) ; 295 MPI_Barrier(winComm_[rank]) ; 296 } 297 else 298 { 299 winComm_[rank] = MPI_COMM_NULL ; 300 windows_[rank].resize(2) ; 301 windows_[rank][0] = MPI_WIN_NULL ; 302 windows_[rank][1] = MPI_WIN_NULL ; 303 buffer->attachWindows(windows_[rank]) ; 304 } 305 306 } 307 } 308 309 310 259 311 /*! 260 312 Verify state of buffers. Buffer is under pending state if there is no message on it -
XIOS3/trunk/src/transport/legacy_context_client.hpp
r2343 r2458 45 45 46 46 // Functions to set/get buffers 47 bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false);47 void getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers); 48 48 void newBuffer(int rank); 49 void checkAttachWindows(CClientBuffer* buffer , int rank) ; 49 50 bool checkBuffers(list<int>& ranks); 50 51 bool checkBuffers(void); -
XIOS3/trunk/src/transport/legacy_context_server.cpp
r2433 r2458 95 95 96 96 traceOff(); 97 MPI_Improbe(MPI_ANY_SOURCE, 20, interComm,&flag,&message, &status);97 MPI_Improbe(MPI_ANY_SOURCE, 20, interComm,&flag,&message, &status); 98 98 traceOn(); 99 99 if (flag==true) listenPendingRequest(message, status) ; -
XIOS3/trunk/src/transport/one_sided_context_client.hpp
r2343 r2458 41 41 42 42 // Functions to set/get buffers 43 bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false);43 // bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 44 44 void newBuffer(int rank); 45 45 bool checkBuffers(list<int>& ranks); -
XIOS3/trunk/src/type/type_util.hpp
r2408 r2458 68 68 class CPoolNode ; 69 69 class CPoolNodeGroup ; 70 70 class CServiceNode ; 71 class CServiceNodeGroup ; 72 71 73 template <typename T> inline string getStrType(void); 72 74 … … 154 156 macro(CPoolNode) 155 157 macro(CPoolNodeGroup) 158 macro(CServiceNode) 159 macro(CServiceNodeGroup) 156 160 157 161 #undef macro -
XIOS3/trunk/src/xml_parser_decl.cpp
r2408 r2458 43 43 macro( ExtractDomain ) 44 44 macro( PoolNode ) 45 macro( ServiceNode ) 45 46 } 46 47 }
Note: See TracChangeset
for help on using the changeset viewer.