Changeset 2403


Ignore:
Timestamp:
09/16/22 15:20:24 (20 months ago)
Author:
ymipsl
Message:

Refactor service manager notification
YM

Location:
XIOS3/trunk/src/manager
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/manager/services_manager.cpp

    r2260 r2403  
    7676  } 
    7777 
     78  bool CServicesManager::createServicesOnto(const std::string& poolId, const std::string& serviceId, const std::string& OnServiceId, bool wait) 
     79  { 
     80 
     81    int leader ; 
     82    int poolSize ; 
     83     
     84    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ; 
     85    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ; 
     86    if (wait) 
     87    { 
     88      while (!ok)  
     89      { 
     90        CXios::getDaemonsManager()->eventLoop() ; 
     91        ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ; 
     92      } 
     93    } 
     94 
     95    if (ok)  
     96    { 
     97      info(40)<<"CServicesManager : create service on other, notification to leader "<<leader<<", serviceId : "<<serviceId<<", service onto : "<<OnServiceId<<endl ; 
     98      createServicesOntoNotify(leader, serviceId, OnServiceId) ; 
     99      return true ; 
     100    } 
     101    else return false ; 
     102  } 
    78103 
    79104  void CServicesManager::createServicesNotify(int rank, const string& serviceId, int type, int size, int nbPartitions) 
    80105  { 
    81     winNotify_->lockWindow(rank,0) ; 
    82     winNotify_->updateFromWindow(rank, this, &CServicesManager::notificationsDumpIn) ; 
    83     notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions)) ; 
    84     winNotify_->updateToWindow(rank, this, &CServicesManager::notificationsDumpOut) ; 
    85     winNotify_->unlockWindow(rank,0) ; 
     106    notifyType_=NOTIFY_CREATE_SERVICE ; 
     107    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions ) ; 
     108    sendNotification(rank) ; 
     109  } 
     110 
     111 
     112  void CServicesManager::createServicesOntoNotify(int rank, const string& serviceId, const string& OnServiceId) 
     113  { 
     114    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ; 
     115    notifyCreateServiceOnto_=make_tuple(serviceId, OnServiceId) ; 
     116    sendNotification(rank) ; 
     117  } 
     118 
     119  void CServicesManager::sendNotification(int rank) 
     120  { 
     121    winNotify_->lockWindowExclusive(rank) ; 
     122    winNotify_->pushToLockedWindow(rank, this, &CServicesManager::notificationsDumpOut) ; 
     123    winNotify_->unlockWindow(rank) ; 
    86124  } 
    87125 
    88126   
    89   void CServicesManager::checkCreateServicesNotification(void) 
    90   { 
    91     int commRank ; 
    92     MPI_Comm_rank(xiosComm_,&commRank) ; 
    93     winNotify_->lockWindow(commRank,0) ; 
    94     winNotify_->updateFromWindow(commRank, this, &CServicesManager::notificationsDumpIn) ; 
    95      
    96     if (!notifications_.empty()) 
    97     { 
    98       auto info = notifications_.front() ; 
    99       xios::info(40)<<"CServicesManager : receive create service notification : "<<get<0>(info)<<endl ; 
    100       CServer::getServersRessource()->getPoolRessource()->createService(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ; 
    101       notifications_.pop_front() ; 
    102       winNotify_->updateToWindow(commRank, this, &CServicesManager::notificationsDumpOut) ;      
    103     } 
    104     winNotify_->unlockWindow(commRank,0) ; 
    105  
    106   } 
    107  
    108127  void CServicesManager::eventLoop(void) 
    109128  { 
     
    114133    if (time-lastEventLoop_ > eventLoopLatency_)  
    115134    { 
    116       checkCreateServicesNotification() ; 
     135      checkNotifications() ; 
    117136      lastEventLoop_=time ; 
    118137    } 
     
    120139  } 
    121140 
    122    
     141 
     142 
     143  void CServicesManager::checkNotifications(void) 
     144  { 
     145    int commRank ; 
     146    MPI_Comm_rank(xiosComm_, &commRank) ; 
     147    winNotify_->lockWindowExclusive(commRank) ; 
     148    winNotify_->popFromLockedWindow(commRank, this, &CServicesManager::notificationsDumpIn) ; 
     149    winNotify_->unlockWindow(commRank) ; 
     150    if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ; 
     151    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ; 
     152  } 
     153 
     154  void CServicesManager::createService(void) 
     155  { 
     156    auto& arg=notifyCreateService_ ; 
     157    CServer::getServersRessource()->getPoolRessource()->createService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ; 
     158  } 
     159 
     160  void CServicesManager::createServiceOnto(void) 
     161  { 
     162    auto& arg=notifyCreateService_ ; 
     163    //CServer::getServersRessource()->getPoolRessource()->createService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ; 
     164  } 
     165 
    123166  void CServicesManager::notificationsDumpOut(CBufferOut& buffer) 
    124167  { 
    125      
     168 
    126169    buffer.realloc(maxBufferSize_) ; 
    127     
    128     buffer<<(int)notifications_.size(); 
    129      
    130     for(auto it=notifications_.begin();it!=notifications_.end(); ++it)  
    131       buffer << std::get<0>(*it) << static_cast<int>(std::get<1>(*it))<< std::get<2>(*it) << std::get<3>(*it)  ; 
     170     
     171    if (notifyType_==NOTIFY_CREATE_SERVICE) 
     172    { 
     173      auto& arg=notifyCreateService_ ; 
     174      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ; 
     175    } 
     176    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 
     177    { 
     178      auto& arg=notifyCreateServiceOnto_ ; 
     179      buffer << notifyType_<< get<0>(arg) << get<1>(arg)  ; 
     180    } 
    132181  } 
    133182 
    134183  void CServicesManager::notificationsDumpIn(CBufferIn& buffer) 
    135184  { 
    136     std::string id ; 
    137     int type ; 
    138     int size;  
    139     int nbPartitions ; 
    140  
    141     notifications_.clear() ; 
    142     int nbNotifications ; 
    143     buffer>>nbNotifications ; 
    144     for(int i=0;i<nbNotifications;i++)  
    145     { 
    146       buffer>>id>>type>>size>>nbPartitions ; 
    147       notifications_.push_back(std::make_tuple(id,type,size,nbPartitions)) ; 
    148     } 
    149   } 
    150  
     185    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ; 
     186    else 
     187    { 
     188      buffer>>notifyType_; 
     189      if (notifyType_==NOTIFY_CREATE_SERVICE) 
     190      { 
     191        auto& arg=notifyCreateService_ ; 
     192        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ; 
     193      } 
     194      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 
     195      { 
     196        auto& arg=notifyCreateServiceOnto_ ; 
     197        buffer >> get<0>(arg) >> get<1>(arg) ; 
     198      } 
     199    } 
     200  }   
    151201   
    152202  void CServicesManager::servicesDumpOut(CBufferOut& buffer) 
     
    198248    winServices_->unlockWindow(managerGlobalLeader_) ; 
    199249 
    200 /* 
    201     winServices_->lockWindow(managerGlobalLeader_,0) ; 
    202     winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 
    203     services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ; 
    204     winServices_->updateToWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ; 
    205     winServices_->unlockWindow(managerGlobalLeader_,0) ;*/ 
    206250  } 
    207251 
     
    213257    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 
    214258    winServices_->unlockWindow(managerGlobalLeader_) ; 
    215 /* 
    216     winServices_->lockWindow(managerGlobalLeader_,0) ; 
    217     winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 
    218     winServices_->unlockWindow(managerGlobalLeader_,0) ;*/ 
    219259 
    220260    auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ; 
  • XIOS3/trunk/src/manager/services_manager.hpp

    r2335 r2403  
    2121    static const int CLIENT=0 ; 
    2222    static const int GATHERER=1 ; 
    23     static const int IO_SERVER=2 ; 
    24     static const int OUT_SERVER=3 ; 
    25     static const int ALL_SERVICES=4 ; 
     23    static const int WRITER=2 ; 
     24    static const int READER=3 ; 
     25    static const int IO_SERVER=4 ; 
     26    static const int OUT_SERVER=5 ; 
     27    static const int ALL_SERVICES=6 ; 
    2628 
     29    private: 
     30    const int NOTIFY_NOTHING=0 ; 
     31    const int NOTIFY_CREATE_SERVICE=1 ; 
     32    const int NOTIFY_CREATE_SERVICE_ONTO=2 ; 
     33     
    2734    public: 
    2835     
     
    3138     
    3239    bool createServices(const std::string& poolId, const std::string& serviceId, int type, int size, int nbPartition, bool wait=true) ; 
    33     void createServicesNotify(int rank, const string& serviceId, int type, int size, int nbPartitions) ; 
    34     void checkCreateServicesNotification(void) ; 
     40    bool createServicesOnto(const std::string& poolId, const std::string& serviceId, const std::string& onServiceId, bool wait=true) ; 
     41     
    3542    void eventLoop(void) ; 
    36     void notificationsDumpOut(CBufferOut& buffer) ; 
    37     void notificationsDumpIn(CBufferIn& buffer) ; 
    3843     
    3944    void registerService(const std::string& poolId, const std::string& serviceId, const int& partitionId, int type, int size, int nbPartitions, int leader) ; 
     
    4550    void servicesDumpOut(CBufferOut& buffer) ; 
    4651    void servicesDumpIn(CBufferIn& buffer) ; 
    47      
     52 
     53    private: 
     54 
     55    void createService(void) ; 
     56    void createServiceOnto(void) ;     
     57    void createServicesNotify(int rank, const string& serviceId, int type, int size, int nbPartitions) ; 
     58    void createServicesOntoNotify(int rank, const string& serviceId, const string& OnServiceId) ; 
     59    void sendNotification(int rank) ; 
     60    void checkNotifications(void) ; 
     61    void notificationsDumpOut(CBufferOut& buffer) ; 
     62    void notificationsDumpIn(CBufferIn& buffer) ; 
     63 
    4864 
    4965    private : 
     
    5571    MPI_Comm xiosComm_ ; 
    5672 
    57     std::list<std::tuple<std::string, int, int, int> > notifications_; 
    58      
     73    int notifyType_ ; 
     74    tuple<std::string, int, int, int> notifyCreateService_ ; 
     75    tuple<std::string, std::string> notifyCreateServiceOnto_ ; 
     76    
    5977    std::map<tuple<std::string, std::string, int>, std::tuple<int, int, int, int> > services_ ; 
    6078 
Note: See TracChangeset for help on using the changeset viewer.