Changeset 2547 for XIOS3/trunk/src/manager/pool_ressource.cpp
- Timestamp:
- 08/29/23 17:24:04 (10 months ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk/src/manager/pool_ressource.cpp
r2523 r2547 8 8 #include "timer.hpp" 9 9 #include "event_scheduler.hpp" 10 #include "thread_manager.hpp" 10 11 11 12 namespace xios … … 33 34 else eventScheduler_= make_shared<CEventScheduler>(poolComm) ; 34 35 freeRessourceEventScheduler_ = eventScheduler_ ; 36 std::hash<string> hashString ; 37 hashId_ = hashString("CPoolRessource::"+Id) ; 38 if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CPoolRessource::threadEventLoop, this) ; 39 } 40 41 void CPoolRessource::synchronize(void) 42 { 43 bool out=false ; 44 size_t timeLine=0 ; 45 46 eventScheduler_->registerEvent(timeLine, hashId_) ; 47 while (!out) 48 { 49 CThreadManager::yield() ; 50 out = eventScheduler_->queryEvent(timeLine,hashId_) ; 51 if (out) eventScheduler_->popEvent() ; 52 } 35 53 } 36 54 … … 122 140 MPI_Comm_rank(poolComm_, &commRank) ; 123 141 winNotify_->popFromExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ; 124 if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ; 125 else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ; 142 if (notifyType_==NOTIFY_CREATE_SERVICE) 143 { 144 if (CThreadManager::isUsingThreads()) synchronize() ; 145 createService() ; 146 } 147 else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 148 { 149 if (CThreadManager::isUsingThreads()) synchronize() ; 150 createServiceOnto() ; 151 } 126 152 } 127 153 … … 233 259 } 234 260 CTimer::get("CPoolRessource::eventLoop").suspend(); 235 if (services_.empty() && finalizeSignal_) return true ; 236 else return false ; 237 } 261 if (services_.empty() && finalizeSignal_) finished_=true ; 262 return finished_ ; 263 } 264 265 void CPoolRessource::threadEventLoop(void) 266 { 267 CTimer::get("CPoolRessource::eventLoop").resume(); 268 info(100)<<"Launch Thread for CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ; 269 CThreadManager::threadInitialize() ; 270 271 do 272 { 273 274 double time=MPI_Wtime() ; 275 int flag ; 276 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); 277 if (time-lastEventLoop_ > eventLoopLatency_) 278 { 279 //checkCreateServiceNotification() ; 280 checkNotifications() ; 281 lastEventLoop_=time ; 282 } 283 284 for(auto it=services_.begin();it!=services_.end();++it) 285 { 286 if (it->second->isFinished()) 287 { 288 delete it->second ; 289 services_.erase(it) ; 290 // destroy server_context -> to do later 291 break ; 292 } ; 293 } 294 295 CTimer::get("CPoolRessource::eventLoop").suspend(); 296 if (services_.empty() && finalizeSignal_) finished_=true ; 297 298 if (!finished_) CThreadManager::yield() ; 299 300 } while (!finished_) ; 301 302 CThreadManager::threadFinalize() ; 303 info(100)<<"Close thread for CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ; 304 } 305 238 306 /* 239 307 void CPoolRessource::checkCreateServiceNotification(void) … … 347 415 } 348 416 349 void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached417 void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients 350 418 { 351 419 services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, eventScheduler, Id_, serviceId, partitionId, type, nbPartitions) ;
Note: See TracChangeset
for help on using the changeset viewer.