Ignore:
Timestamp:
08/29/23 17:24:04 (10 months ago)
Author:
ymipsl
Message:

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/server.cpp

    r2535 r2547  
    2525#include "workflow_graph.hpp" 
    2626#include "release_static_allocation.hpp" 
     27#include "thread_manager.hpp" 
    2728#include <sys/stat.h> 
    2829#include <unistd.h> 
     
    6869      if (!CXios::usingOasis) 
    6970      { 
    70         if (!is_MPI_Initialized) MPI_Init(NULL, NULL); 
     71        if (!is_MPI_Initialized)  
     72        { 
     73          int required = MPI_THREAD_SERIALIZED ; 
     74          int provided ; 
     75          MPI_Init_thread(NULL,NULL, required, &provided) ; 
     76        } 
    7177        
    7278        // split the global communicator 
     
    99105      else // using OASIS 
    100106      { 
    101         if (!is_MPI_Initialized) driver_ = new CThirdPartyDriver(); 
     107         
     108        if (!is_MPI_Initialized)  
     109        { 
     110          int required = MPI_THREAD_SERIALIZED ; 
     111          int provided ; 
     112          MPI_Init_thread(NULL,NULL, required, &provided) ; 
     113        } 
     114 
     115        driver_ = new CThirdPartyDriver(); 
    102116 
    103117        driver_->getComponentCommunicator( serverComm ); 
     
    200214          { 
    201215            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
    202             ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 
     216            if (CThreadManager::isUsingThreads())  
     217              while(!ressourcesManager->hasPool(CXios::defaultPoolId))  
     218              { 
     219                daemonsManager->eventLoop() ; 
     220                CThreadManager::yield() ; 
     221              } 
     222            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 
     223           
    203224            servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ; 
    204             servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 
     225            if (CThreadManager::isUsingThreads())  
     226              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,0))  
     227              { 
     228                daemonsManager->eventLoop() ; 
     229                CThreadManager::yield() ; 
     230              } 
     231            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 
     232             
    205233            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ; 
    206             servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 
     234            if (CThreadManager::isUsingThreads())  
     235            { 
     236              daemonsManager->eventLoop() ; 
     237              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0)) CThreadManager::yield() ; 
     238            } 
     239            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 
    207240          } 
    208241          else 
     
    214247            if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; 
    215248            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
    216             ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 
     249            if (CThreadManager::isUsingThreads())  
     250              while(!ressourcesManager->hasPool(CXios::defaultPoolId))  
     251              { 
     252                daemonsManager->eventLoop() ; 
     253                CThreadManager::yield() ; 
     254              } 
     255            else ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 
     256 
    217257            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 
    218             servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultGathererId) ; 
     258            if (CThreadManager::isUsingThreads())  
     259              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultGathererId,0))  
     260              { 
     261                daemonsManager->eventLoop() ; 
     262                CThreadManager::yield() ; 
     263              } 
     264            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultGathererId) ; 
     265 
    219266            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ; 
    220             servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 
     267            if (CThreadManager::isUsingThreads())  
     268              while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultReaderId, 0))  
     269              { 
     270                daemonsManager->eventLoop() ; 
     271                CThreadManager::yield() ; 
     272              } 
     273            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultReaderId) ; 
     274             
    221275            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ; 
    222             servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 
     276            if (CThreadManager::isUsingThreads()) 
     277              for(int i=0; i<nbPoolsServer2; i++) 
     278                while(!servicesManager->hasService(CXios::defaultPoolId, CXios::defaultWriterId,i))  
     279                { 
     280                  daemonsManager->eventLoop() ; 
     281                  CThreadManager::yield() ; 
     282                } 
     283            else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 
    223284          } 
    224285        } 
     
    233294      { 
    234295        daemonsManager->eventLoop() ; 
     296        if (CThreadManager::isUsingThreads()) CThreadManager::yield(); 
    235297        MPI_Test(&req,&ok,&status) ; 
    236298      } 
    237299 
    238300 
    239       testingEventScheduler() ; 
     301      //testingEventScheduler() ; 
    240302/* 
    241303      MPI_Request req ; 
     
    262324      { 
    263325        finished=daemonsManager->eventLoop() ; 
     326        if (CThreadManager::isUsingThreads()) CThreadManager::yield() ; 
    264327      } 
    265328      CTimer::get("XIOS event loop").suspend() ; 
Note: See TracChangeset for help on using the changeset viewer.