Changeset 2547 for XIOS3/trunk/src/node
- Timestamp:
- 08/29/23 17:24:04 (11 months ago)
- Location:
- XIOS3/trunk/src/node
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk/src/node/context.cpp
r2507 r2547 34 34 #include "services.hpp" 35 35 #include "contexts_manager.hpp" 36 #include "thread_manager.hpp" 36 37 #include <chrono> 37 38 #include <random> … … 488 489 } 489 490 contextId_ = getId() ; 490 491 attached_mode=true ;492 if (!CXios::isUsingServer()) attached_mode=false ;493 494 491 495 492 string contextRegistryId=getId() ; … … 544 541 MPI_Comm_dup(intraComm_, &intraCommClient); 545 542 comms.push_back(intraCommClient); 546 // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context547 543 548 544 CContextServer* server ; … … 578 574 if (commRank==0) 579 575 { 580 CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions, true) ; 581 for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ; 582 } 583 setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble (attached ???) 576 while (! CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions)) yield() ; 577 for(int i=0 ; i<nbPartitions; i++) 578 while (!CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId())) yield() ; 579 } 580 synchronize() ; 581 setCurrent(getId()) ; // getCurrent/setCurrent may be supress, it can cause a lot of trouble 584 582 MPI_Bcast(&nbPartitions, 1, MPI_INT, 0, intraComm_) ; 585 583 … … 587 585 for(int i=0 ; i<nbPartitions; i++) 588 586 { 589 parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ;587 while (!parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer)) yield() ; 590 588 int type ; 591 if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type, true) ; 589 if (commRank==0) while (!CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type)) yield(); 590 synchronize() ; 592 591 MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 592 593 593 string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ; 594 594 … … 620 620 if (serviceType_ == CServicesManager::CLIENT) 621 621 { 622 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultWriterId, clientServers) ; 623 else if (CXios::usingServer2) createServerInterComm(CXios::defaultPoolId, CXios::defaultGathererId, clientServers) ; 622 if (CXios::usingServer2) createServerInterComm(CXios::defaultPoolId, CXios::defaultGathererId, clientServers) ; 624 623 else createServerInterComm(CXios::defaultPoolId, CXios::defaultWriterId, clientServers) ; 625 624 … … 629 628 clientServers.clear() ; 630 629 631 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+CXios::defaultReaderId, clientServers) ; 632 else createServerInterComm(CXios::defaultPoolId, CXios::defaultReaderId, clientServers) ; 630 createServerInterComm(CXios::defaultPoolId, CXios::defaultReaderId, clientServers) ; 633 631 readerClientOut_.push_back(clientServers[0].second.first) ; 634 632 readerServerOut_.push_back(clientServers[0].second.second) ; … … 658 656 else 659 657 { 660 if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+serviceId, retClientServers) ; 661 else createServerInterComm(poolId, serviceId, retClientServers) ; 658 createServerInterComm(poolId, serviceId, retClientServers) ; 662 659 for(auto& retClientServer : retClientServers) clientServers.push_back(retClientServer.second) ; 663 660 664 661 int serviceType ; 665 if (intraCommRank_==0) CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType, true) ; 662 if (intraCommRank_==0) while(!CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType)) yield(); 663 synchronize() ; 666 664 MPI_Bcast(&serviceType,1,MPI_INT,0,intraComm_) ; 667 665 … … 694 692 void CContext::globalEventLoop(void) 695 693 { 696 lockContext() ; 697 CXios::getDaemonsManager()->eventLoop() ; 698 unlockContext() ; 699 setCurrent(getId()) ; 694 if (CThreadManager::isUsingThreads()) CThreadManager::yield(); 695 else 696 { 697 lockContext() ; 698 CXios::getDaemonsManager()->eventLoop() ; 699 unlockContext() ; 700 setCurrent(getId()) ; 701 } 700 702 } 701 703 704 void CContext::yield(void) 705 { 706 if (CThreadManager::isUsingThreads()) 707 { 708 CThreadManager::yield(); 709 setCurrent(getId()) ; 710 } 711 else 712 { 713 lockContext() ; 714 CXios::getDaemonsManager()->eventLoop() ; 715 unlockContext() ; 716 setCurrent(getId()) ; 717 } 718 } 719 720 void CContext::synchronize(void) 721 { 722 bool out, finished; 723 size_t timeLine=timeLine_ ; 724 725 timeLine_++ ; 726 eventScheduler_->registerEvent(timeLine, hashId_) ; 727 728 out = eventScheduler_->queryEvent(timeLine,hashId_) ; 729 if (out) eventScheduler_->popEvent() ; 730 while (!out) 731 { 732 yield() ; 733 out = eventScheduler_->queryEvent(timeLine,hashId_) ; 734 if (out) eventScheduler_->popEvent() ; 735 } 736 } 737 702 738 bool CContext::scheduledEventLoop(bool enableEventsProcessing) 703 739 { … … 761 797 if (couplerOutClient_.find(fullContextId)==couplerOutClient_.end()) 762 798 { 763 bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 799 while(!CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm())) yield(); 800 synchronize() ; 764 801 765 802 MPI_Comm interComm, interCommClient, interCommServer ; 766 803 MPI_Comm intraCommClient, intraCommServer ; 767 804 768 if (ok)MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;805 MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 769 806 770 807 MPI_Comm_dup(intraComm_, &intraCommClient) ; … … 783 820 else if (couplerInClient_.find(fullContextId)==couplerInClient_.end()) 784 821 { 785 bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 822 while(!CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm())) yield() ; 823 synchronize() ; 786 824 787 825 MPI_Comm interComm, interCommClient, interCommServer ; 788 826 MPI_Comm intraCommClient, intraCommServer ; 789 827 790 if (ok)MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ;828 MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 791 829 792 830 MPI_Comm_dup(intraComm_, &intraCommClient) ; … … 824 862 couplersInFinalized=true ; 825 863 for(auto& couplerOutClient : couplerOutClient_) couplersInFinalized &= isCouplerInContextFinalized(couplerOutClient.second) ; 826 globalEventLoop() ; 864 if (CThreadManager::isUsingThreads()) yield() ; 865 else globalEventLoop() ; 827 866 } while (!couplersInFinalized) ; 828 867 … … 900 939 info(100)<<"DEBUG: context "<<getId()<<" release client reader ok"<<endl ; 901 940 } 941 closeAllFile() ; 902 942 } 903 943 else if (serviceType_==CServicesManager::GATHERER) 904 944 { 905 for(auto& client : writerClientOut_) 906 { 907 client->finalize(); 908 bool bufferReleased; 909 do 910 { 911 client->eventLoop(); 912 bufferReleased = !client->havePendingRequests(); 913 } while (!bufferReleased); 945 CContextClient* client ; 946 CContextServer* server ; 947 948 for(int n=0; n<writerClientOut_.size() ; n++) 949 { 950 client=writerClientOut_[n] ; 951 server=writerServerOut_[n] ; 952 953 client->finalize(); 954 bool bufferReleased; 955 do 956 { 957 client->eventLoop(); 958 bufferReleased = !client->havePendingRequests(); 959 } while (!bufferReleased); 914 960 915 bool notifiedFinalized=false ; 916 do 917 { 918 notifiedFinalized=client->isNotifiedFinalized() ; 919 } while (!notifiedFinalized) ; 920 client->releaseBuffers(); 961 bool notifiedFinalized=false ; 962 do 963 { 964 notifiedFinalized=client->isNotifiedFinalized() ; 965 } while (!notifiedFinalized) ; 966 server->releaseBuffers(); 967 client->releaseBuffers(); 921 968 } 922 969 closeAllFile(); 970 writerClientIn_[0]->releaseBuffers(); 971 writerServerIn_[0]->releaseBuffers(); 923 972 //ym writerClientIn & writerServerIn not released here ==> to check !! 924 973 } … … 957 1006 void CContext::setDefaultServices(void) 958 1007 { 959 defaultPoolWriterId_ = CXios::defaultPoolId ; 960 defaultPoolReaderId_ = CXios::defaultPoolId ; 961 defaultPoolGathererId_ = CXios::defaultPoolId ; 962 defaultWriterId_ = CXios::defaultWriterId ; 963 defaultReaderId_ = CXios::defaultReaderId ; 964 defaultGathererId_ = CXios::defaultGathererId ; 965 defaultUsingServer2_ = CXios::usingServer2 ; 1008 if (!CXios::isUsingServer()) 1009 { 1010 defaultPoolWriterId_ = CXios::defaultPoolId ; 1011 defaultPoolReaderId_ = CXios::defaultPoolId ; 1012 defaultPoolGathererId_ = CXios::defaultPoolId ; 1013 defaultWriterId_ = "attached" ; 1014 defaultReaderId_ = "attached" ; 1015 defaultGathererId_ = "attached" ; 1016 defaultUsingServer2_ = false; 1017 } 1018 else 1019 { 1020 defaultPoolWriterId_ = CXios::defaultPoolId ; 1021 defaultPoolReaderId_ = CXios::defaultPoolId ; 1022 defaultPoolGathererId_ = CXios::defaultPoolId ; 1023 defaultWriterId_ = CXios::defaultWriterId ; 1024 defaultReaderId_ = CXios::defaultReaderId ; 1025 defaultGathererId_ = CXios::defaultGathererId ; 1026 defaultUsingServer2_ = CXios::usingServer2 ; 966 1027 967 if (!default_pool.isEmpty()) defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 968 if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 969 if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 970 if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 971 if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 972 if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 973 if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 974 if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 1028 if (!default_pool.isEmpty()) defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 1029 if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 1030 if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 1031 if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 1032 if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 1033 if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 1034 if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 1035 if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 1036 } 975 1037 } 976 1038 -
XIOS3/trunk/src/node/context.hpp
r2509 r2547 112 112 bool scheduledEventLoop(bool enableEventsProcessing=true) ; 113 113 void globalEventLoop(void); 114 void yield(void) ; 115 void synchronize(void) ; 114 116 115 117 // Finalize a context … … 368 370 int getIntraCommRank(void) {return intraCommRank_;} 369 371 int getIntraCommSize(void) {return intraCommSize_;} 372 373 public: 374 shared_ptr<CEventScheduler> getEventScheduler(void) {return eventScheduler_ ;} 370 375 private: 371 376 shared_ptr<CEventScheduler> eventScheduler_ ; //! The local event scheduler for context -
XIOS3/trunk/src/node/pool_node.cpp
r2458 r2547 1 1 #include "pool_node.hpp" 2 2 #include "cxios.hpp" 3 #include "thread_manager.hpp" 3 4 #include<cmath> 4 5 … … 61 62 else ERROR("void CPoolNode::allocateRessources(void)",<<"Pool has no name or id, attributes <id> or <name> must be specified") 62 63 ressourcesManager->createPool(poolId, nbRessources) ; 63 ressourcesManager->waitPoolRegistration(poolId) ; 64 if (CThreadManager::isUsingThreads()) 65 while(!ressourcesManager->hasPool(CXios::defaultPoolId)) 66 { 67 CXios::getDaemonsManager()->eventLoop() ; 68 CThreadManager::yield() ; 69 } 70 else ressourcesManager->waitPoolRegistration(poolId) ; 64 71 auto services=this->getAllServiceNodes() ; 65 72 for(auto& service : services) service->allocateRessources(poolId) ; -
XIOS3/trunk/src/node/service_node.cpp
r2458 r2547 60 60 else if (!hasAutoGeneratedId() ) serviceId=getId() ; 61 61 else ERROR("void CServiceNode::allocateRessources(const string& poolId)",<<"Service has no name or id, attributes <id> or <name> must be specified") 62 62 63 servicesManager->createServices(poolId, serviceId, serviceType, nbRessources, nb_partitions, true) ; 64 if (CThreadManager::isUsingThreads()) 65 for(int i=0; i<nb_partitions; i++) 66 while(!servicesManager->hasService(poolId, serviceId, i)) 67 { 68 CXios::getDaemonsManager()->eventLoop() ; 69 CThreadManager::yield() ; 70 } 71 else servicesManager->waitServiceRegistration(CXios::defaultPoolId, CXios::defaultWriterId) ; 72 63 73 } 64 74
Note: See TracChangeset
for help on using the changeset viewer.