Ignore:
Timestamp:
08/29/23 17:24:04 (10 months ago)
Author:
ymipsl
Message:

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/manager/pool_ressource.cpp

    r2523 r2547  
    88#include "timer.hpp" 
    99#include "event_scheduler.hpp" 
     10#include "thread_manager.hpp" 
    1011 
    1112namespace xios 
     
    3334    else eventScheduler_= make_shared<CEventScheduler>(poolComm) ; 
    3435    freeRessourceEventScheduler_ = eventScheduler_ ; 
     36    std::hash<string> hashString ; 
     37    hashId_ = hashString("CPoolRessource::"+Id) ; 
     38    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CPoolRessource::threadEventLoop, this) ; 
     39  } 
     40 
     41  void CPoolRessource::synchronize(void) 
     42  { 
     43    bool out=false ;  
     44    size_t timeLine=0 ; 
     45           
     46    eventScheduler_->registerEvent(timeLine, hashId_) ; 
     47    while (!out) 
     48    { 
     49      CThreadManager::yield() ; 
     50      out = eventScheduler_->queryEvent(timeLine,hashId_) ; 
     51      if (out) eventScheduler_->popEvent() ; 
     52    } 
    3553  } 
    3654 
     
    122140    MPI_Comm_rank(poolComm_, &commRank) ; 
    123141    winNotify_->popFromExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ; 
    124     if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ; 
    125     else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ; 
     142    if (notifyType_==NOTIFY_CREATE_SERVICE)  
     143    { 
     144      if (CThreadManager::isUsingThreads()) synchronize() ; 
     145      createService() ; 
     146    } 
     147    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)  
     148    { 
     149      if (CThreadManager::isUsingThreads()) synchronize() ; 
     150      createServiceOnto() ; 
     151    } 
    126152  } 
    127153 
     
    233259    } 
    234260    CTimer::get("CPoolRessource::eventLoop").suspend(); 
    235     if (services_.empty() && finalizeSignal_) return true ; 
    236     else return false ; 
    237   } 
     261    if (services_.empty() && finalizeSignal_) finished_=true ; 
     262    return finished_ ; 
     263  } 
     264 
     265  void CPoolRessource::threadEventLoop(void) 
     266  { 
     267    CTimer::get("CPoolRessource::eventLoop").resume(); 
     268    info(100)<<"Launch Thread for  CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ; 
     269    CThreadManager::threadInitialize() ;  
     270     
     271    do 
     272    { 
     273 
     274      double time=MPI_Wtime() ; 
     275      int flag ; 
     276      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); 
     277      if (time-lastEventLoop_ > eventLoopLatency_)  
     278      { 
     279        //checkCreateServiceNotification() ; 
     280        checkNotifications() ; 
     281        lastEventLoop_=time ; 
     282      } 
     283     
     284      for(auto it=services_.begin();it!=services_.end();++it)  
     285      { 
     286        if (it->second->isFinished()) 
     287        { 
     288          delete it->second ;  
     289          services_.erase(it) ; 
     290          // destroy server_context -> to do later 
     291          break ; 
     292        } ; 
     293      } 
     294 
     295      CTimer::get("CPoolRessource::eventLoop").suspend(); 
     296      if (services_.empty() && finalizeSignal_) finished_=true ; 
     297       
     298      if (!finished_) CThreadManager::yield() ; 
     299     
     300    } while (!finished_) ; 
     301 
     302    CThreadManager::threadFinalize() ; 
     303    info(100)<<"Close thread for  CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ; 
     304  } 
     305 
    238306/* 
    239307  void CPoolRessource::checkCreateServiceNotification(void) 
     
    347415  } 
    348416 
    349   void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached 
     417  void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients 
    350418  { 
    351419    services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, eventScheduler, Id_, serviceId, partitionId, type, nbPartitions) ; 
Note: See TracChangeset for help on using the changeset viewer.