Ignore:
Timestamp:
08/29/23 17:24:04 (10 months ago)
Author:
ymipsl
Message:

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/node/context.cpp

    r2507 r2547  
    3434#include "services.hpp" 
    3535#include "contexts_manager.hpp" 
     36#include "thread_manager.hpp" 
    3637#include <chrono> 
    3738#include <random> 
     
    488489      } 
    489490      contextId_ = getId() ; 
    490        
    491       attached_mode=true ; 
    492       if (!CXios::isUsingServer()) attached_mode=false ; 
    493  
    494491 
    495492      string contextRegistryId=getId() ; 
     
    544541    MPI_Comm_dup(intraComm_, &intraCommClient); 
    545542    comms.push_back(intraCommClient); 
    546     // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context 
    547543 
    548544    CContextServer* server ; 
     
    578574    if (commRank==0) 
    579575    { 
    580       CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions, true) ; 
    581       for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ; 
    582     } 
    583     setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble (attached ???) 
     576      while (! CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions)) yield() ; 
     577      for(int i=0 ; i<nbPartitions; i++)   
     578        while (!CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId())) yield() ; 
     579    } 
     580    synchronize() ; 
     581    setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble  
    584582    MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ; 
    585583       
     
    587585    for(int i=0 ; i<nbPartitions; i++) 
    588586    { 
    589       parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 
     587      while (!parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer)) yield() ; 
    590588      int type ;  
    591       if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type, true) ; 
     589      if (commRank==0) while (!CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type)) yield(); 
     590      synchronize() ; 
    592591      MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 
     592 
    593593      string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ; 
    594594 
     
    620620    if (serviceType_ == CServicesManager::CLIENT) 
    621621    { 
    622       if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultWriterId, clientServers) ; 
    623       else if (CXios::usingServer2) createServerInterComm(CXios::defaultPoolId, CXios::defaultGathererId, clientServers) ; 
     622      if (CXios::usingServer2) createServerInterComm(CXios::defaultPoolId, CXios::defaultGathererId, clientServers) ; 
    624623      else createServerInterComm(CXios::defaultPoolId, CXios::defaultWriterId, clientServers) ; 
    625624       
     
    629628      clientServers.clear() ; 
    630629    
    631       if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultReaderId, clientServers) ; 
    632       else createServerInterComm(CXios::defaultPoolId, CXios::defaultReaderId, clientServers) ; 
     630      createServerInterComm(CXios::defaultPoolId, CXios::defaultReaderId, clientServers) ; 
    633631      readerClientOut_.push_back(clientServers[0].second.first) ;  
    634632      readerServerOut_.push_back(clientServers[0].second.second) ; 
     
    658656    else 
    659657    { 
    660       if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+serviceId, retClientServers) ; 
    661       else createServerInterComm(poolId, serviceId, retClientServers) ; 
     658      createServerInterComm(poolId, serviceId, retClientServers) ; 
    662659      for(auto& retClientServer : retClientServers)  clientServers.push_back(retClientServer.second) ; 
    663660       
    664661      int serviceType ; 
    665       if (intraCommRank_==0) CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType, true) ; 
     662      if (intraCommRank_==0) while(!CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType)) yield();  
     663      synchronize() ; 
    666664      MPI_Bcast(&serviceType,1,MPI_INT,0,intraComm_) ; 
    667665       
     
    694692  void CContext::globalEventLoop(void) 
    695693  { 
    696     lockContext() ; 
    697     CXios::getDaemonsManager()->eventLoop() ; 
    698     unlockContext() ; 
    699     setCurrent(getId()) ; 
     694    if (CThreadManager::isUsingThreads()) CThreadManager::yield(); 
     695    else 
     696    { 
     697      lockContext() ; 
     698      CXios::getDaemonsManager()->eventLoop() ; 
     699      unlockContext() ; 
     700      setCurrent(getId()) ; 
     701    } 
    700702  } 
    701703 
     704  void CContext::yield(void) 
     705  { 
     706    if (CThreadManager::isUsingThreads())  
     707    { 
     708      CThreadManager::yield(); 
     709      setCurrent(getId()) ; 
     710    } 
     711    else 
     712    { 
     713      lockContext() ; 
     714      CXios::getDaemonsManager()->eventLoop() ; 
     715      unlockContext() ; 
     716      setCurrent(getId()) ; 
     717    } 
     718  } 
     719 
     720  void CContext::synchronize(void) 
     721  { 
     722    bool out, finished;  
     723    size_t timeLine=timeLine_ ; 
     724       
     725    timeLine_++ ; 
     726    eventScheduler_->registerEvent(timeLine, hashId_) ; 
     727       
     728    out = eventScheduler_->queryEvent(timeLine,hashId_) ; 
     729    if (out) eventScheduler_->popEvent() ; 
     730    while (!out) 
     731    { 
     732      yield() ; 
     733      out = eventScheduler_->queryEvent(timeLine,hashId_) ; 
     734      if (out) eventScheduler_->popEvent() ; 
     735    } 
     736  } 
     737   
    702738  bool CContext::scheduledEventLoop(bool enableEventsProcessing)  
    703739  { 
     
    761797       if (couplerOutClient_.find(fullContextId)==couplerOutClient_.end())  
    762798       { 
    763          bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 
     799         while(!CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm())) yield(); 
     800         synchronize() ; 
    764801      
    765802         MPI_Comm interComm, interCommClient, interCommServer  ; 
    766803         MPI_Comm intraCommClient, intraCommServer ; 
    767804 
    768          if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 
     805         MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 
    769806 
    770807        MPI_Comm_dup(intraComm_, &intraCommClient) ; 
     
    783820    else if (couplerInClient_.find(fullContextId)==couplerInClient_.end()) 
    784821    { 
    785       bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 
     822      while(!CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm())) yield() ; 
     823      synchronize() ; 
    786824      
    787825       MPI_Comm interComm, interCommClient, interCommServer  ; 
    788826       MPI_Comm intraCommClient, intraCommServer ; 
    789827 
    790        if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 
     828       MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 
    791829 
    792830       MPI_Comm_dup(intraComm_, &intraCommClient) ; 
     
    824862          couplersInFinalized=true ; 
    825863          for(auto& couplerOutClient : couplerOutClient_) couplersInFinalized &= isCouplerInContextFinalized(couplerOutClient.second) ;  
    826           globalEventLoop() ; 
     864          if (CThreadManager::isUsingThreads()) yield() ; 
     865          else globalEventLoop() ; 
    827866        } while (!couplersInFinalized) ; 
    828867 
     
    900939          info(100)<<"DEBUG: context "<<getId()<<" release client reader ok"<<endl ; 
    901940        } 
     941        closeAllFile() ; 
    902942      } 
    903943      else if (serviceType_==CServicesManager::GATHERER) 
    904944      { 
    905          for(auto& client : writerClientOut_) 
    906          { 
    907            client->finalize(); 
    908            bool bufferReleased; 
    909            do 
    910            { 
    911              client->eventLoop(); 
    912              bufferReleased = !client->havePendingRequests(); 
    913            } while (!bufferReleased); 
     945        CContextClient* client ; 
     946        CContextServer* server ; 
     947         
     948        for(int n=0; n<writerClientOut_.size() ; n++) 
     949        { 
     950          client=writerClientOut_[n] ; 
     951          server=writerServerOut_[n] ; 
     952         
     953          client->finalize(); 
     954          bool bufferReleased; 
     955          do 
     956          { 
     957            client->eventLoop(); 
     958            bufferReleased = !client->havePendingRequests(); 
     959          } while (!bufferReleased); 
    914960            
    915            bool notifiedFinalized=false ; 
    916            do 
    917            { 
    918              notifiedFinalized=client->isNotifiedFinalized() ; 
    919            } while (!notifiedFinalized) ; 
    920            client->releaseBuffers(); 
     961          bool notifiedFinalized=false ; 
     962          do 
     963          { 
     964            notifiedFinalized=client->isNotifiedFinalized() ; 
     965          } while (!notifiedFinalized) ; 
     966          server->releaseBuffers(); 
     967          client->releaseBuffers(); 
    921968         } 
    922969         closeAllFile(); 
     970         writerClientIn_[0]->releaseBuffers(); 
     971         writerServerIn_[0]->releaseBuffers();          
    923972         //ym writerClientIn & writerServerIn not released here ==> to check !! 
    924973      } 
     
    9571006   void CContext::setDefaultServices(void) 
    9581007   { 
    959      defaultPoolWriterId_ = CXios::defaultPoolId ; 
    960      defaultPoolReaderId_ = CXios::defaultPoolId ; 
    961      defaultPoolGathererId_ = CXios::defaultPoolId ; 
    962      defaultWriterId_ = CXios::defaultWriterId ; 
    963      defaultReaderId_ = CXios::defaultReaderId ; 
    964      defaultGathererId_ = CXios::defaultGathererId ; 
    965      defaultUsingServer2_ = CXios::usingServer2 ; 
     1008     if (!CXios::isUsingServer()) 
     1009     { 
     1010       defaultPoolWriterId_ = CXios::defaultPoolId ; 
     1011       defaultPoolReaderId_ = CXios::defaultPoolId ; 
     1012       defaultPoolGathererId_ = CXios::defaultPoolId ; 
     1013       defaultWriterId_ = "attached" ; 
     1014       defaultReaderId_ = "attached" ; 
     1015       defaultGathererId_ =  "attached" ; 
     1016       defaultUsingServer2_ = false; 
     1017     } 
     1018     else 
     1019     { 
     1020       defaultPoolWriterId_ = CXios::defaultPoolId ; 
     1021       defaultPoolReaderId_ = CXios::defaultPoolId ; 
     1022       defaultPoolGathererId_ = CXios::defaultPoolId ; 
     1023       defaultWriterId_ = CXios::defaultWriterId ; 
     1024       defaultReaderId_ = CXios::defaultReaderId ; 
     1025       defaultGathererId_ = CXios::defaultGathererId ; 
     1026       defaultUsingServer2_ = CXios::usingServer2 ; 
    9661027      
    967      if (!default_pool.isEmpty())  defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 
    968      if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 
    969      if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 
    970      if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 
    971      if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 
    972      if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 
    973      if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 
    974      if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 
     1028       if (!default_pool.isEmpty())  defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 
     1029       if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 
     1030       if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 
     1031       if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 
     1032       if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 
     1033       if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 
     1034       if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 
     1035       if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 
     1036     } 
    9751037   } 
    9761038 
Note: See TracChangeset for help on using the changeset viewer.