Changeset 1764
- Timestamp:
- 11/05/19 16:02:34 (3 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_SERVICES/src
- Files:
-
- 27 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_SERVICES/src/client.cpp
r1761 r1764 355 355 CXios::setXiosComm(xiosGlobalComm) ; 356 356 357 MPI_Comm commUnfree ; 358 MPI_Comm_dup(clientComm, &commUnfree ) ; 359 357 360 } 358 361 … … 732 735 CTimer::get("XIOS init/finalize",false).suspend() ; 733 736 CTimer::get("XIOS").suspend() ; 737 738 CXios::finalizeDaemonsManager() ; 739 734 740 if (!is_MPI_Initialized) 735 741 { … … 748 754 report(0)<< " Memory report : increasing it by a factor will increase performance, depending of the volume of data wrote in file at each time step of the file"<<endl ; 749 755 report(100)<<CTimer::getAllCumulatedTime()<<endl ; 756 757 750 758 } 751 759 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/context_client.cpp
r1761 r1764 262 262 { 263 263 for (int i = 0; i < context->serverPrimServer.size(); ++i) context->serverPrimServer[i]->listen(); 264 CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 264 //ym CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 265 context->globalEventLoop() ; 265 266 } 266 267 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/context_server.cpp
r1761 r1764 30 30 using namespace std ; 31 31 32 CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 32 CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) : eventScheduler_(nullptr), isProcessingEvent_(false) 33 33 { 34 34 context=parent; … … 53 53 if (contextInfo.serviceType != CServicesManager::CLIENT) // we must have an event scheduler => to be retrieve from the associated services 54 54 { 55 eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;55 if (!isAttachedModeEnabled()) eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ; 56 56 } 57 57 … … 321 321 CEventServer* event; 322 322 323 if (context->isProcessingEvent()) return ; 323 // if (context->isProcessingEvent()) return ; 324 if (isProcessingEvent_) return ; 324 325 325 326 it=events.find(currentTimeLine); … … 330 331 if (event->isFull()) 331 332 { 332 if (!scheduled && CServer::eventScheduler) // Skip event scheduling for attached mode and reception on client side333 if (!scheduled && eventScheduler_) // Skip event scheduling for attached mode and reception on client side 333 334 { 334 CServer::eventScheduler->registerEvent(currentTimeLine,hashId);335 eventScheduler_->registerEvent(currentTimeLine,hashId); 335 336 scheduled=true; 336 337 } 337 else if (! CServer::eventScheduler || CServer::eventScheduler->queryEvent(currentTimeLine,hashId) )338 else if (!eventScheduler_ || eventScheduler_->queryEvent(currentTimeLine,hashId) ) 338 339 { 339 340 // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes 340 341 // The best way to properly solve this problem will be to use the event scheduler also in attached mode 341 342 // for now just set up a MPI barrier 342 if (!CServer::eventScheduler && CXios::isServer) MPI_Barrier(intraComm) ; 343 344 context->setProcessingEvent() ; 343 if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ; 344 345 // context->setProcessingEvent() ; 346 isProcessingEvent_=true ; 345 347 CTimer::get("Process events").resume(); 346 348 dispatchEvent(*event); 347 349 CTimer::get("Process events").suspend(); 348 context->unsetProcessingEvent() ; 350 isProcessingEvent_=false ; 351 // context->unsetProcessingEvent() ; 349 352 pendingEvent=false; 350 353 delete event; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/context_server.hpp
r1761 r1764 66 66 vector<MPI_Win> windows ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side. 67 67 CEventScheduler* eventScheduler_ ; 68 bool isProcessingEvent_ ; 68 69 } ; 69 70 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/cxios.cpp
r1761 r1764 341 341 } 342 342 343 344 void CXios::finalizeRessourcesManager() 345 { 346 delete ressourcesManager_; 347 } 348 349 void CXios::finalizeServicesManager() 350 { 351 delete servicesManager_ ; 352 } 353 354 void CXios::finalizeContextsManager() 355 { 356 delete contextsManager_ ; 357 } 358 359 void CXios::finalizeDaemonsManager() 360 { 361 delete daemonsManager_ ; 362 } 363 364 343 365 CPoolRessource* CXios::getPoolRessource(void) 344 366 { -
XIOS/dev/dev_ym/XIOS_SERVICES/src/cxios.hpp
r1761 r1764 91 91 static void launchRessourcesManager(bool isXiosServer) ; 92 92 93 static void finalizeServicesManager() ; 94 static void finalizeContextsManager() ; 95 static void finalizeDaemonsManager() ; 96 static void finalizeRessourcesManager() ; 97 93 98 static CRessourcesManager* getRessourcesManager(void) { return ressourcesManager_ ;} 94 99 static CServicesManager* getServicesManager(void) { return servicesManager_ ;} -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/contexts_manager.cpp
r1761 r1764 36 36 } 37 37 38 39 CContextsManager::~CContextsManager() 40 { 41 delete winNotify_ ; 42 delete winContexts_ ; 43 } 44 38 45 bool CContextsManager::createServerContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, 39 46 const string& contextId, bool wait) … … 48 55 while (!ok) 49 56 { 50 CXios::getDaemonsManager()-> eventLoop() ;57 CXios::getDaemonsManager()->servicesEventLoop() ; 51 58 ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ; 52 59 } … … 79 86 while (!ok) 80 87 { 81 CXios::getDaemonsManager()-> eventLoop() ;88 CXios::getDaemonsManager()->servicesEventLoop() ; 82 89 ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ; 83 90 if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/contexts_manager.hpp
r1761 r1764 28 28 29 29 CContextsManager(bool isXiosServer) ; 30 ~CContextsManager() ; 31 30 32 bool createServerContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const string& contextId, bool wait=true) ; 31 33 /* bool createServerContextIntercomm(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId, -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/daemons_manager.cpp
r1761 r1764 26 26 } 27 27 28 CDaemonsManager::~CDaemonsManager() 29 { 30 CXios::finalizeContextsManager() ; 31 CXios::finalizeServicesManager() ; 32 CXios::finalizeRessourcesManager() ; 33 } 34 28 35 bool CDaemonsManager::eventLoop(void) 29 36 { … … 31 38 CXios::getServicesManager()->eventLoop() ; 32 39 CXios::getContextsManager()->eventLoop() ; 33 if (isServer_) return CServer::getServersRessource()->eventLoop( ) ;34 else return CXios::getPoolRessource()->eventLoop( ) ;40 if (isServer_) return CServer::getServersRessource()->eventLoop(false) ; 41 else return CXios::getPoolRessource()->eventLoop(false) ; 35 42 } 43 44 bool CDaemonsManager::servicesEventLoop(void) 45 { 46 CXios::getRessourcesManager()->eventLoop() ; 47 CXios::getServicesManager()->eventLoop() ; 48 CXios::getContextsManager()->eventLoop() ; 49 if (isServer_) return CServer::getServersRessource()->eventLoop(true) ; 50 else return CXios::getPoolRessource()->eventLoop(true) ; 51 } 36 52 37 53 } -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/daemons_manager.hpp
r1761 r1764 11 11 12 12 CDaemonsManager(bool isXiosServer) ; 13 ~CDaemonsManager() ; 13 14 14 15 bool eventLoop(void) ; 16 bool servicesEventLoop(void) ; 15 17 16 18 private: -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/pool_ressource.cpp
r1761 r1764 100 100 } 101 101 102 bool CPoolRessource::eventLoop( void)102 bool CPoolRessource::eventLoop(bool serviceOnly) 103 103 { 104 104 checkCreateServiceNotification() ; 105 105 for (auto it=services_.begin(); it!=services_.end() ; ++it) 106 106 { 107 if (it->second->eventLoop( ))107 if (it->second->eventLoop(serviceOnly)) 108 108 { 109 109 services_.erase(it) ; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/pool_ressource.hpp
r1761 r1764 31 31 void checkCreateServiceNotification(void) ; 32 32 void createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) ; 33 bool eventLoop( void) ;33 bool eventLoop(bool serviceOnly=false) ; 34 34 CService* getService(const std::string serviceId, int partitionId) { return services_[make_tuple(serviceId,partitionId)]; } 35 35 void finalizeSignal(void) ; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/ressources_manager.cpp
r1761 r1764 34 34 MPI_Barrier(xiosComm_) ; 35 35 } 36 36 37 CRessourcesManager::~CRessourcesManager() 38 { 39 delete winNotify_ ; 40 delete winRessources_ ; 41 } 37 42 38 43 void CRessourcesManager::createPool(const string& poolId, int size) -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/ressources_manager.hpp
r1761 r1764 28 28 29 29 CRessourcesManager(bool isXiosServer) ; 30 ~CRessourcesManager() ; 30 31 31 32 void eventLoop(void) ; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/server_context.cpp
r1761 r1764 15 15 16 16 CServerContext::CServerContext(CService* parentService, MPI_Comm contextComm, const std::string& poolId, const std::string& serviceId, 17 const int& partitionId, const std::string& contextId) : finalizeSignal_(false), parentService_(parentService) 17 const int& partitionId, const std::string& contextId) : finalizeSignal_(false), parentService_(parentService), 18 hasNotification_(false) 18 19 { 19 20 int localRank, globalRank, commSize ; … … 48 49 } 49 50 50 51 CServerContext::~CServerContext() 52 { 53 54 } 55 51 56 bool CServerContext::createIntercomm(const string& poolId, const string& serviceId, const int& partitionId, const string& contextId, 52 57 const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait) … … 75 80 } 76 81 77 auto eventScheduler=parentService_->getEventScheduler() ; 78 std::hash<string> hashString ; 79 size_t hashId = hashString(name_) ; 80 size_t currentTimeLine=0 ; 81 eventScheduler->registerEvent(currentTimeLine,hashId); 82 83 while (!eventScheduler->queryEvent(currentTimeLine,hashId)) 84 { 85 CXios::getDaemonsManager()->eventLoop() ; 86 } 82 MPI_Request req ; 83 MPI_Status status ; 84 MPI_Ibarrier(intraComm,&req) ; 85 86 int flag=false ; 87 while(!flag) 88 { 89 CXios::getDaemonsManager()->servicesEventLoop() ; 90 MPI_Test(&req,&flag,&status) ; 91 } 92 // auto eventScheduler=parentService_->getEventScheduler() ; 93 // std::hash<string> hashString ; 94 // size_t hashId = hashString(name_) ; 95 // size_t currentTimeLine=0 ; 96 // eventScheduler->registerEvent(currentTimeLine,hashId); 97 // 98 // while (!eventScheduler->queryEvent(currentTimeLine,hashId)) 99 // { 100 // CXios::getDaemonsManager()->servicesEventLoop() ; 101 // eventScheduler->checkEvent() ; 102 // } 87 103 88 104 MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ; … … 90 106 if (ok) 91 107 { 108 int globalRank ; 109 MPI_Comm_rank(xiosComm_,&globalRank) ; 110 MPI_Bcast(&contextLeader, 1, MPI_INT, 0, intraComm) ; 111 112 int overlap, nOverlap ; 113 if (contextLeader==globalRank) overlap=1 ; 114 else overlap=0 ; 115 MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ; 116 /* 92 117 int overlap ; 93 118 if (get<0>(overlapedComm_[name_])) overlap=1 ; … … 98 123 int commSize ; 99 124 MPI_Comm_size(contextComm_,&commSize ) ; 100 if (nOverlap==commSize) 101 { 125 */ 126 if (nOverlap> 0 ) 127 { 128 while (get<0>(overlapedComm_[name_])==false) CXios::getDaemonsManager()->servicesEventLoop() ; 129 isAttachedMode_=true ; 102 130 cout<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ; 103 131 interCommClient=newInterCommClient ; … … 107 135 { 108 136 cout<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ; 137 isAttachedMode_=false ; 109 138 MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ; 110 139 MPI_Comm_dup(interCommClient, &interCommServer) ; … … 128 157 for(int rank=0; rank<commSize; rank++) 129 158 { 130 notify Type_=NOTIFY_CREATE_INTERCOMM ;131 notify CreateIntercomm_ = make_tuple(remoteLeader, sourceContext) ;159 notifyOutType_=NOTIFY_CREATE_INTERCOMM ; 160 notifyOutCreateIntercomm_ = make_tuple(remoteLeader, sourceContext) ; 132 161 sendNotification(rank) ; 133 162 } … … 147 176 buffer.realloc(maxBufferSize_) ; 148 177 149 if (notify Type_==NOTIFY_CREATE_INTERCOMM)150 { 151 auto& arg=notify CreateIntercomm_ ;152 buffer << notify Type_ << std::get<0>(arg)<<std::get<1>(arg) ;178 if (notifyOutType_==NOTIFY_CREATE_INTERCOMM) 179 { 180 auto& arg=notifyOutCreateIntercomm_ ; 181 buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) ; 153 182 } 154 183 } … … 156 185 void CServerContext::notificationsDumpIn(CBufferIn& buffer) 157 186 { 158 if (buffer.bufferSize() == 0) notify Type_= NOTIFY_NOTHING ;187 if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ; 159 188 else 160 189 { 161 buffer>>notify Type_;162 if (notify Type_==NOTIFY_CREATE_INTERCOMM)163 { 164 auto& arg=notify CreateIntercomm_ ;190 buffer>>notifyInType_; 191 if (notifyInType_==NOTIFY_CREATE_INTERCOMM) 192 { 193 auto& arg=notifyInCreateIntercomm_ ; 165 194 buffer >> std::get<0>(arg)>> std::get<1>(arg) ; 166 195 } … … 170 199 void CServerContext::checkNotifications(void) 171 200 { 172 int commRank ; 173 MPI_Comm_rank(contextComm_, &commRank) ; 174 winNotify_->lockWindow(commRank,0) ; 175 winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ; 176 winNotify_->unlockWindow(commRank,0) ; 177 if (notifyType_==NOTIFY_CREATE_INTERCOMM) createIntercomm() ; 178 } 179 180 bool CServerContext::eventLoop(void) 201 if (!hasNotification_) 202 { 203 int commRank ; 204 MPI_Comm_rank(contextComm_, &commRank) ; 205 winNotify_->lockWindow(commRank,0) ; 206 winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ; 207 winNotify_->unlockWindow(commRank,0) ; 208 209 if (notifyInType_!= NOTIFY_NOTHING) 210 { 211 hasNotification_=true ; 212 auto eventScheduler=parentService_->getEventScheduler() ; 213 std::hash<string> hashString ; 214 size_t hashId = hashString(name_) ; 215 size_t currentTimeLine=0 ; 216 eventScheduler->registerEvent(currentTimeLine,hashId); 217 } 218 } 219 220 if (hasNotification_) 221 { 222 auto eventScheduler=parentService_->getEventScheduler() ; 223 std::hash<string> hashString ; 224 size_t hashId = hashString(name_) ; 225 size_t currentTimeLine=0 ; 226 if (eventScheduler->queryEvent(currentTimeLine,hashId)) 227 { 228 if (notifyInType_==NOTIFY_CREATE_INTERCOMM) createIntercomm() ; 229 hasNotification_=false ; 230 } 231 } 232 } 233 234 bool CServerContext::eventLoop(bool serviceOnly) 181 235 { 182 236 bool finished=false ; 183 checkNotifications() ;184 if ( context_!=nullptr)237 if (winNotify_!=nullptr) checkNotifications() ; 238 if (!serviceOnly && context_!=nullptr) 185 239 { 186 240 if (context_->eventLoop()) … … 198 252 { 199 253 MPI_Comm interCommServer, interCommClient ; 200 int remoteLeader=get<0>(notifyCreateIntercomm_) ; 201 string sourceContext=get<1>(notifyCreateIntercomm_) ; 254 auto& arg=notifyInCreateIntercomm_ ; 255 int remoteLeader=get<0>(arg) ; 256 string sourceContext=get<1>(arg) ; 202 257 203 258 auto it=overlapedComm_.find(sourceContext) ; … … 215 270 if (nOverlap==commSize) 216 271 { 217 cout<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ; 272 info(10)<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ; 273 isAttachedMode_=true ; 218 274 interCommClient=get<2>(it->second) ; 219 275 interCommServer=get<1>(it->second) ; … … 224 280 else if (nOverlap==0) 225 281 { 226 cout<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ; 282 info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ; 283 isAttachedMode_=false ; 227 284 MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ; 228 285 MPI_Comm_dup(interCommServer,&interCommClient) ; … … 233 290 else 234 291 { 235 cout<<"CServerContext::createIntercomm : partial overlap ==> not managed"<<endl;292 ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : partial overlap ==> not managed") ; 236 293 } 237 294 238 295 } 239 296 297 void CServerContext::freeComm(void) 298 { 299 delete winNotify_ ; 300 winNotify_=nullptr ; 301 MPI_Comm_free(&contextComm_) ; 302 // don't forget intercomm -> later 303 } 304 240 305 void CServerContext::finalizeSignal(void) 241 306 { 242 307 finalizeSignal_=true ; 243 308 } 244 245 309 246 310 } -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/server_context.hpp
r1761 r1764 18 18 CServerContext(CService* parentService, MPI_Comm contextComm, const std::string& poolId, const std::string& serviceId, 19 19 const int& partitionId, const std::string& contextId) ; 20 ~CServerContext() ; 20 21 21 22 bool createIntercomm(const string& poolId, const string& serviceId, const int& partitionId, const string& contextId, … … 26 27 void checkNotifications(void) ; 27 28 28 bool eventLoop( void) ;29 bool eventLoop(bool serviceOnly=false) ; 29 30 void notificationsDumpOut(CBufferOut& buffer) ; 30 31 void notificationsDumpIn(CBufferIn& buffer) ; 31 32 void finalizeSignal(void) ; 33 void freeComm(void) ; 34 bool isAttachedMode(void) { return isAttachedMode_ ;} 32 35 private: 33 36 void createIntercomm(void) ; … … 47 50 const size_t maxBufferSize_=1024*1024 ; 48 51 CWindowManager* winNotify_ ; 49 int notify Type_ ;50 tuple<int, std::string> notify CreateIntercomm_ ;52 int notifyInType_, notifyOutType_ ; 53 tuple<int, std::string> notifyInCreateIntercomm_, notifyOutCreateIntercomm_ ; 51 54 52 55 const int localLeader_=0 ; 53 56 int globalLeader_ ; 54 57 bool finalizeSignal_ ; 58 bool hasNotification_ ; 59 bool isAttachedMode_ ; 55 60 56 61 friend class CWindowManager ; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/servers_ressource.cpp
r1761 r1764 57 57 } 58 58 59 notify Type_=NOTIFY_CREATE_POOL ;60 notify CreatePool_ = make_tuple(poolId, isPartOf) ;59 notifyOutType_=NOTIFY_CREATE_POOL ; 60 notifyOutCreatePool_ = make_tuple(poolId, isPartOf) ; 61 61 sendNotification(freeRessourcesRank_[i]) ; 62 62 } … … 71 71 for(int rank=0; rank<commSize;rank++) 72 72 { 73 notify Type_=NOTIFY_FINALIZE ;73 notifyOutType_=NOTIFY_FINALIZE ; 74 74 sendNotification(rank) ; 75 75 } … … 89 89 buffer.realloc(maxBufferSize_) ; 90 90 91 if (notify Type_==NOTIFY_CREATE_POOL)91 if (notifyOutType_==NOTIFY_CREATE_POOL) 92 92 { 93 auto& arg=notify CreatePool_ ;94 buffer << notify Type_ << std::get<0>(arg) << std::get<1>(arg) ;93 auto& arg=notifyOutCreatePool_ ; 94 buffer << notifyOutType_ << std::get<0>(arg) << std::get<1>(arg) ; 95 95 } 96 else if (notify Type_==NOTIFY_FINALIZE) buffer << notifyType_ ;96 else if (notifyOutType_==NOTIFY_FINALIZE) buffer << notifyOutType_ ; 97 97 } 98 98 99 99 void CServersRessource::notificationsDumpIn(CBufferIn& buffer) 100 100 { 101 if (buffer.bufferSize() == 0) notify Type_= NOTIFY_NOTHING ;101 if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ; 102 102 else 103 103 { 104 buffer>>notify Type_;105 if (notify Type_==NOTIFY_CREATE_POOL)104 buffer>>notifyInType_; 105 if (notifyInType_==NOTIFY_CREATE_POOL) 106 106 { 107 auto& arg=notify CreatePool_ ;107 auto& arg=notifyInCreatePool_ ; 108 108 buffer >> std::get<0>(arg) >> std::get<1>(arg) ; 109 109 } 110 else if (notify Type_==NOTIFY_FINALIZE) { /*nothing to do*/}110 else if (notifyInType_==NOTIFY_FINALIZE) { /*nothing to do*/} 111 111 } 112 112 } 113 113 114 bool CServersRessource::eventLoop( )114 bool CServersRessource::eventLoop(bool serviceOnly) 115 115 { 116 116 checkNotifications() ; 117 117 if (poolRessource_!=nullptr) 118 118 { 119 if (poolRessource_->eventLoop( ))119 if (poolRessource_->eventLoop(serviceOnly)) 120 120 { 121 121 poolRessource_=nullptr ; … … 135 135 winNotify_->popFromWindow(commRank, this, &CServersRessource::notificationsDumpIn) ; 136 136 winNotify_->unlockWindow(commRank,0) ; 137 if (notify Type_==NOTIFY_CREATE_POOL) createPool() ;138 else if (notify Type_==NOTIFY_FINALIZE) finalizeSignal() ;137 if (notifyInType_==NOTIFY_CREATE_POOL) createPool() ; 138 else if (notifyInType_==NOTIFY_FINALIZE) finalizeSignal() ; 139 139 } 140 140 141 141 void CServersRessource::createPool(void) 142 142 { 143 auto& arg=notify CreatePool_ ;143 auto& arg=notifyInCreatePool_ ; 144 144 string poolId=get<0>(arg) ; 145 145 bool isPartOf=get<1>(arg) ; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/servers_ressource.hpp
r1761 r1764 26 26 void createPool(const string& poolId, const int size) ; 27 27 void createPool(void) ; 28 bool eventLoop( void) ;28 bool eventLoop(bool serviceOnly=false) ; 29 29 void sendNotification(int rank) ; 30 30 void notificationsDumpOut(CBufferOut& buffer) ; … … 44 44 const size_t maxBufferSize_=1024*1024 ; 45 45 CWindowManager* winNotify_ ; 46 int notifyType_ ; 47 std::tuple<std::string, bool> notifyCreatePool_ ; 46 47 int notifyInType_,notifyOutType_ ; 48 std::tuple<std::string, bool> notifyInCreatePool_,notifyOutCreatePool_ ; 48 49 CPoolRessource* poolRessource_ ; 49 50 bool finalizeSignal_ ; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/services.cpp
r1761 r1764 10 10 CService::CService(MPI_Comm serviceComm, const std::string& poolId, const std::string& serviceId, const int& partitionId, 11 11 int type, int nbPartitions) : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId), 12 partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions) 12 partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false) 13 13 14 14 … … 34 34 } 35 35 eventScheduler_ = new CEventScheduler(serviceComm_) ; 36 37 ostringstream oss; 38 oss<<partitionId; 39 name_= poolId+"::"+serviceId+"_"+oss.str(); 36 40 } 37 41 … … 41 45 MPI_Comm_size(serviceComm_, &commSize) ; 42 46 43 for(int rank=0; rank<commSize; rank++) createContextNotify(rank, poolId, serviceId, partitionId, contextId) ; 47 for(int rank=0; rank<commSize; rank++) 48 { 49 notifyOutType_=NOTIFY_CREATE_CONTEXT ; 50 notifyOutCreateContext_ = make_tuple(poolId, serviceId, partitionId, contextId) ; 51 sendNotification(rank) ; 52 } 44 53 } 45 54 /* … … 87 96 } 88 97 89 bool CService::eventLoop(void) 90 { 91 checkCreateContextNotification() ; 98 bool CService::eventLoop(bool serviceOnly) 99 { 100 //checkCreateContextNotification() ; 101 checkNotifications() ; 102 92 103 eventScheduler_->checkEvent() ; 93 104 for(auto it=contexts_.begin();it!=contexts_.end();++it) 94 105 { 95 if (it->second->eventLoop( ))106 if (it->second->eventLoop(serviceOnly)) 96 107 { 97 108 contexts_.erase(it) ; … … 105 116 } 106 117 118 void CService::sendNotification(int rank) 119 { 120 winNotify_->lockWindow(rank,0) ; 121 winNotify_->pushToWindow(rank, this, &CService::notificationsDumpOut) ; 122 winNotify_->unlockWindow(rank,0) ; 123 } 124 125 126 void CService::notificationsDumpOut(CBufferOut& buffer) 127 { 128 129 buffer.realloc(maxBufferSize_) ; 130 131 if (notifyOutType_==NOTIFY_CREATE_CONTEXT) 132 { 133 auto& arg=notifyOutCreateContext_ ; 134 buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) << std::get<2>(arg)<<std::get<3>(arg) ; 135 } 136 } 137 138 void CService::notificationsDumpIn(CBufferIn& buffer) 139 { 140 if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ; 141 else 142 { 143 buffer>>notifyInType_; 144 if (notifyInType_==NOTIFY_CREATE_CONTEXT) 145 { 146 info(10)<<"NotifyDumpOut"<<endl ; 147 auto& arg=notifyInCreateContext_ ; 148 buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg); 149 } 150 } 151 } 152 153 154 155 156 void CService::checkNotifications(void) 157 { 158 if (!hasNotification_) 159 { 160 int commRank ; 161 MPI_Comm_rank(serviceComm_, &commRank) ; 162 winNotify_->lockWindow(commRank,0) ; 163 winNotify_->popFromWindow(commRank, this, &CService::notificationsDumpIn) ; 164 winNotify_->unlockWindow(commRank,0) ; 165 166 if (notifyInType_!= NOTIFY_NOTHING) 167 { 168 hasNotification_=true ; 169 std::hash<string> hashString ; 170 size_t hashId = hashString(name_) ; 171 size_t currentTimeLine=0 ; 172 eventScheduler_->registerEvent(currentTimeLine,hashId); 173 } 174 } 175 176 if (hasNotification_) 177 { 178 std::hash<string> hashString ; 179 size_t hashId = hashString(name_) ; 180 size_t currentTimeLine=0 ; 181 if (eventScheduler_->queryEvent(currentTimeLine,hashId)) 182 { 183 if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ; 184 hasNotification_=false ; 185 } 186 } 187 } 188 189 190 191 107 192 void CService::checkCreateContextNotification(void) 108 193 { … … 122 207 } 123 208 209 void CService::createContext(void) 210 { 211 auto& arg=notifyInCreateContext_ ; 212 string poolId = get<0>(arg) ; 213 string& serviceId = get<1>(arg) ; 214 int partitionId = get<2>(arg) ; 215 string contextId = get<3>(arg) ; 216 contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; 217 } 218 219 //to remove 124 220 void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId) 125 221 { -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/services.hpp
r1761 r1764 16 16 public: 17 17 18 const int NOTIFY_NOTHING=0 ; 19 const int NOTIFY_CREATE_CONTEXT=1 ; 20 18 21 CService(MPI_Comm serviceComm, const std::string& poolId, const std::string& serviceId, const int& partitionId, 19 22 int type, int nbPartitions) ; 20 bool eventLoop( void) ;23 bool eventLoop(bool serviceOnly=false) ; 21 24 void createContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId) ; 22 25 void checkCreateContextNotification(void) ; … … 36 39 37 40 private: 38 41 void sendNotification(int rank) ; 42 void notificationsDumpOut(CBufferOut& buffer) ; 43 void notificationsDumpIn(CBufferIn& buffer) ; 44 void checkNotifications(void) ; 45 void createContext(void) ; 46 39 47 MPI_Comm serviceComm_ ; 40 48 MPI_Comm globalComm_ ; 41 49 42 50 const size_t maxBufferSize_=1024*1024 ; 43 51 const int localLeader_=0 ; 44 52 int globalLeader_ ; 45 53 CWindowManager* winNotify_ ; 54 55 std::string name_ ; 56 46 57 std::list<std::tuple<std::string, std::string, int, std::string>> notifications_; 58 59 bool hasNotification_ ; 60 int notifyInType_,notifyOutType_ ; 61 std::tuple<std::string, std::string, int, std::string> notifyInCreateContext_, notifyOutCreateContext_ ; 62 47 63 std::map<std::string, CServerContext*> contexts_ ; 48 64 bool finalizeSignal_ ; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/services_manager.cpp
r1761 r1764 42 42 } 43 43 44 CServicesManager::~CServicesManager() 45 { 46 delete winNotify_ ; 47 delete winServices_ ; 48 } 49 44 50 bool CServicesManager::createServices(const std::string& poolId, const std::string& serviceId, 45 51 int type, int size, int nbPartitions, bool wait) -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/services_manager.hpp
r1761 r1764 27 27 28 28 CServicesManager(bool isXiosServer) ; 29 ~CServicesManager() ; 30 29 31 bool createServices(const std::string& poolId, const std::string& serviceId, int type, int size, int nbPartition, bool wait=true) ; 30 32 void createServicesNotify(int rank, const string& serviceId, int type, int size, int nbPartitions) ; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/window_manager.hpp
r1761 r1764 126 126 } 127 127 128 ~CWindowManager() 129 { 130 MPI_Win_free(&window_) ; 131 } 128 132 } ; 129 133 } -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.cpp
r1761 r1764 36 36 , isPostProcessed(false), finalized(false) 37 37 , idServer_(), client(nullptr), server(nullptr) 38 , allProcessed(false), countChildC tx_(0), isProcessingEvent_(false)38 , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false) 39 39 40 40 { /* Ne rien faire de plus */ } … … 45 45 , isPostProcessed(false), finalized(false) 46 46 , idServer_(), client(nullptr), server(nullptr) 47 , allProcessed(false), countChildC tx_(0), isProcessingEvent_(false)47 , allProcessed(false), countChildContextFinalized_(0), isProcessingEvent_(false) 48 48 { /* Ne rien faire de plus */ } 49 49 … … 475 475 MPI_Comm_dup(intraComm_, &intraCommClient); 476 476 comms.push_back(intraCommClient); 477 477 // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context 478 478 server = new CContextServer(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ? 479 479 client = new CContextClient(this,intraCommClient,interCommClient); … … 687 687 CATCH_DUMP_ATTR 688 688 689 689 690 void CContext::globalEventLoop(void) 691 { 692 CXios::getDaemonsManager()->eventLoop() ; 693 setCurrent(getId()) ; 694 } 690 695 691 696 … … 693 698 TRY 694 699 { 695 if (hasClient && !hasServer) // For now we only use server level 1 to read data 700 registryOut->hierarchicalGatherRegistry() ; 701 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 702 703 if (hasClient && !hasServer) 696 704 { 697 doPreTimestepOperationsForEnabledReadModeFiles(); 698 } 699 // Send registry upon calling the function the first time 700 if (countChildCtx_ == 0) if (hasClient) sendRegistry() ; 701 702 // Client: 703 // (1) blocking send context finalize to its server 704 // (2) blocking receive context finalize from its server 705 // (3) some memory deallocations 706 if (CXios::isClient) 707 { 708 // Make sure that client (model) enters the loop only once 709 if (countChildCtx_ < 1) 710 { 711 ++countChildCtx_; 712 713 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 714 client->finalize(); 715 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 716 while (client->havePendingRequests()) client->checkBuffers(); 717 718 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 719 while (!server->hasFinished()) 720 server->eventLoop(); 721 info(100)<<"DEBUG: context "<<getId()<<" server has finished"<<endl ; 722 705 doPreTimestepOperationsForEnabledReadModeFiles(); // For now we only use server level 1 to read data 706 707 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 708 client->finalize(); 709 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 710 while (client->havePendingRequests()) client->checkBuffers(); 711 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 723 712 bool notifiedFinalized=false ; 724 713 do … … 727 716 } while (!notifiedFinalized) ; 728 717 client->releaseBuffers(); 729 730 if (hasServer) // Mode attache731 {732 closeAllFile();733 registryOut->hierarchicalGatherRegistry() ;734 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;735 }736 737 //! Deallocate client buffers738 // client->releaseBuffers();739 718 info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ; 740 //! Free internally allocated communicators 741 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 742 MPI_Comm_free(&(*it)); 743 comms.clear(); 744 745 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 746 } 747 } 748 else if (CXios::isServer) 749 { 750 // First context finalize message received from a model 751 // Send context finalize to its child contexts (if any) 752 if (countChildCtx_ == 0) 719 } 720 else if (hasClient && hasServer) 721 { 753 722 for (int i = 0; i < clientPrimServer.size(); ++i) 754 723 { … … 764 733 do 765 734 { 766 // clientPrimServer[i]->checkBuffers();767 735 notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ; 768 736 } while (!notifiedFinalized) ; 769 737 clientPrimServer[i]->releaseBuffers(); 770 738 } 771 772 773 // (Last) context finalized message received 774 if (countChildCtx_ == clientPrimServer.size()) 775 { 776 // Blocking send of context finalize message to its client (e.g. primary server or model) 777 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 778 client->finalize(); 779 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 780 bool bufferReleased; 781 do 782 { 783 client->checkBuffers(); 784 bufferReleased = !client->havePendingRequests(); 785 } while (!bufferReleased); 786 787 bool notifiedFinalized=false ; 788 do 789 { 790 // client->checkBuffers(); 791 notifiedFinalized=client->isNotifiedFinalized() ; 792 } while (!notifiedFinalized) ; 793 client->releaseBuffers(); 794 795 finalized = true; 796 info(100)<<"DEBUG: context "<<getId()<<" bufferRelease OK"<<endl ; 797 798 closeAllFile(); // Just move to here to make sure that server-level 1 can close files 739 closeAllFile(); 740 741 } 742 else if (!hasClient && hasServer) 743 { 744 closeAllFile(); 745 } 746 747 freeComms() ; 799 748 800 /* ym 801 if (hasServer && !hasClient) 802 { 803 registryOut->hierarchicalGatherRegistry() ; 804 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 805 } 806 */ 807 808 //! Deallocate client buffers 809 // client->releaseBuffers(); 810 info(100)<<"DEBUG: context "<<getId()<<" client release"<<endl ; 811 812 /* 813 for (int i = 0; i < clientPrimServer.size(); ++i) 814 clientPrimServer[i]->releaseBuffers(); 815 */ 816 //! Free internally allocated communicators 817 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 818 MPI_Comm_free(&(*it)); 819 comms.clear(); 820 821 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 822 } 823 824 ++countChildCtx_; 825 } 749 parentServerContext_->freeComm() ; 750 finalized = true; 751 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 826 752 } 827 753 CATCH_DUMP_ATTR … … 833 759 TRY 834 760 { 761 int countChildCtx_ ; // ym temporary 762 835 763 if (hasClient && !hasServer) // For now we only use server level 1 to read data 836 764 { … … 2555 2483 CATCH_DUMP_ATTR 2556 2484 2485 2486 void CContext::sendFinalizeClient(CContextClient* contextClient, const string& contextClientId) 2487 TRY 2488 { 2489 CEventClient event(getType(),EVENT_ID_CONTEXT_FINALIZE_CLIENT); 2490 if (contextClient->isServerLeader()) 2491 { 2492 CMessage msg; 2493 msg<<contextClientId ; 2494 const std::list<int>& ranks = contextClient->getRanksServerLeader(); 2495 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 2496 event.push(*itRank,1,msg); 2497 contextClient->sendEvent(event); 2498 } 2499 else contextClient->sendEvent(event); 2500 } 2501 CATCH_DUMP_ATTR 2502 2503 2504 void CContext::recvFinalizeClient(CEventServer& event) 2505 TRY 2506 { 2507 CBufferIn* buffer=event.subEvents.begin()->buffer; 2508 string id; 2509 *buffer>>id; 2510 get(id)->recvFinalizeClient(*buffer); 2511 } 2512 CATCH 2513 2514 void CContext::recvFinalizeClient(CBufferIn& buffer) 2515 TRY 2516 { 2517 countChildContextFinalized_++ ; 2518 } 2519 CATCH_DUMP_ATTR 2520 2521 2522 2523 2557 2524 /*! 2558 2525 * \fn bool CContext::isFinalized(void) -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.hpp
r1761 r1764 53 53 EVENT_ID_POST_PROCESS, EVENT_ID_SEND_REGISTRY, 54 54 EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES, 55 EVENT_ID_PROCESS_GRID_ENABLED_FIELDS 55 EVENT_ID_PROCESS_GRID_ENABLED_FIELDS, 56 EVENT_ID_CONTEXT_FINALIZE_CLIENT, 56 57 }; 57 58 … … 106 107 bool checkBuffersAndListen(bool enableEventsProcessing=true); 107 108 bool eventLoop(bool enableEventsProcessing=true); 109 void globalEventLoop(void); 108 110 109 111 // Finalize a context 110 112 void finalize(void); 113 111 114 void finalize_old(void); 112 115 bool isFinalized(void); … … 170 173 //!< after be gathered to the root process of the context, merged registry is sent to the root process of the servers 171 174 void sendRegistry(void) ; 175 void sendFinalizeClient(CContextClient* contextClient, const string& contextClientId); 176 172 177 173 178 const StdString& getIdServer(); … … 191 196 static void recvRegistry(CEventServer& event) ; 192 197 void recvRegistry(CBufferIn& buffer) ; //!< registry is received by the servers 193 198 static void recvFinalizeClient(CEventServer& event) ; 199 void recvFinalizeClient(CBufferIn& buffer); 200 194 201 void freeComms(void); //!< Free internally allcoated communicators 195 202 void releaseClientBuffers(void); //! Deallocate buffers allocated by clientContexts … … 281 288 bool allProcessed; 282 289 bool finalized; 283 int countChildC tx_; //!< Counter of child contexts (for now it is the number of secondary server pools)290 int countChildContextFinalized_; //!< Counter of child contexts (for now it is the number of secondary server pools) 284 291 StdString idServer_; 285 292 CGarbageCollector garbageCollector; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/field.cpp
r1761 r1764 635 635 636 636 //ym context->checkBuffersAndListen(); 637 context->eventLoop(); 637 //ym context->eventLoop(); 638 context->globalEventLoop(); 638 639 639 640 timer.suspend(); -
XIOS/dev/dev_ym/XIOS_SERVICES/src/server.cpp
r1761 r1764 711 711 // MPI_Comm_free(&intraComm); 712 712 713 CXios::finalizeDaemonsManager(); 714 713 715 if (!is_MPI_Initialized) 714 716 {
Note: See TracChangeset
for help on using the changeset viewer.