Changeset 1765
- Timestamp:
- 11/06/19 11:03:38 (3 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_SERVICES/src
- Files:
-
- 12 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_SERVICES/src/client.cpp
r1764 r1765 45 45 * \param [in/out] returnComm (intra)communicator of client group. 46 46 */ 47 48 void CClient::initRessources(void)49 {50 51 /*52 int commRank;53 MPI_Comm_rank(CXios::globalComm,&commRank) ;54 if (commRank==0)55 {56 ressources.createPool("ioserver1",ressources.getRessourcesSize()/2) ;57 }58 else if (commRank==1)59 {60 ressources.createPool("ioserver2",ressources.getRessourcesSize()/2) ;61 }62 */63 }64 47 65 48 void CClient::initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm) … … 204 187 ///////////////////////////////////////// 205 188 206 // create the services207 /*208 int commRank ;209 MPI_Comm_rank(clientComm,&commRank) ;210 auto contextsManager=CXios::getContextsManager() ;211 212 if (commRank==0)213 {214 contextsManager->createServerContext(CXios::defaultPoolId, CXios::defaultGathererId, 0, codeId) ;215 }216 217 MPI_Comm interComm ;218 219 contextsManager->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultGathererId, 0, codeId, clientComm, interComm) ;220 */221 /* while (true)222 {223 224 }225 */226 189 returnComm = clientComm ; 227 190 } … … 360 323 } 361 324 325 // to check on other architecture 362 326 void CClient::xiosGlobalCommByPublishing(MPI_Comm clientComm, const string& codeId) 363 327 { … … 443 407 } 444 408 445 void CClient::initialize_old(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)446 {447 int initialized ;448 MPI_Initialized(&initialized) ;449 if (initialized) is_MPI_Initialized=true ;450 else is_MPI_Initialized=false ;451 int rank ;452 453 CXios::launchRessourcesManager(false) ;454 CXios::launchServicesManager( false) ;455 CXios::launchContextsManager(false) ;456 457 initRessources() ;458 // don't use OASIS459 if (!CXios::usingOasis)460 {461 // localComm isn't given462 if (localComm == MPI_COMM_NULL)463 {464 if (!is_MPI_Initialized)465 {466 MPI_Init(NULL, NULL);467 }468 CTimer::get("XIOS").resume() ;469 CTimer::get("XIOS init/finalize",false).resume() ;470 boost::hash<string> hashString ;471 472 unsigned long hashClient=hashString(codeId) ;473 unsigned long hashServer=hashString(CXios::xiosCodeId) ;474 unsigned long* hashAll ;475 int size ;476 int myColor ;477 int i,c ;478 MPI_Comm newComm ;479 480 MPI_Comm_size(CXios::globalComm,&size) ;481 MPI_Comm_rank(CXios::globalComm,&rank_);482 483 hashAll=new unsigned long[size] ;484 485 MPI_Allgather(&hashClient,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;486 487 map<unsigned long, int> colors ;488 map<unsigned long, int> leaders ;489 490 for(i=0,c=0;i<size;i++)491 {492 if (colors.find(hashAll[i])==colors.end())493 {494 colors[hashAll[i]] =c ;495 leaders[hashAll[i]]=i ;496 c++ ;497 }498 }499 500 // Verify whether we are on server mode or not501 CXios::setNotUsingServer();502 for (i=0; i < size; ++i)503 {504 if (hashServer == hashAll[i])505 {506 CXios::setUsingServer();507 break;508 }509 }510 511 myColor=colors[hashClient];512 MPI_Comm_split(CXios::globalComm,myColor,rank_,&intraComm) ;513 514 if (CXios::usingServer)515 {516 int clientLeader=leaders[hashClient] ;517 serverLeader=leaders[hashServer] ;518 int intraCommSize, intraCommRank ;519 MPI_Comm_size(intraComm,&intraCommSize) ;520 MPI_Comm_rank(intraComm,&intraCommRank) ;521 info(50)<<"intercommCreate::client "<<rank_<<" intraCommSize : "<<intraCommSize522 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< serverLeader<<endl ;523 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, serverLeader, 0, &interComm) ;524 //rank_ = intraCommRank;525 }526 else527 {528 MPI_Comm_dup(intraComm,&interComm) ;529 }530 delete [] hashAll ;531 }532 // localComm argument is given533 else534 {535 if (CXios::usingServer)536 {537 //ERROR("void CClient::initialize(const string& codeId,MPI_Comm& localComm,MPI_Comm& returnComm)", << " giving a local communictor is not compatible with using server mode") ;538 }539 else540 {541 MPI_Comm_dup(localComm,&intraComm) ;542 MPI_Comm_dup(intraComm,&interComm) ;543 }544 }545 }546 // using OASIS547 else548 {549 // localComm isn't given550 if (localComm == MPI_COMM_NULL)551 {552 if (!is_MPI_Initialized) oasis_init(codeId) ;553 oasis_get_localcomm(localComm) ;554 }555 MPI_Comm_dup(localComm,&intraComm) ;556 557 CTimer::get("XIOS").resume() ;558 CTimer::get("XIOS init/finalize",false).resume() ;559 560 if (CXios::usingServer)561 {562 MPI_Status status ;563 MPI_Comm_rank(intraComm,&rank_) ;564 565 oasis_get_intercomm(interComm,CXios::xiosCodeId) ;566 if (rank_==0) MPI_Recv(&serverLeader,1, MPI_INT, 0, 0, interComm, &status) ;567 MPI_Bcast(&serverLeader,1,MPI_INT,0,intraComm) ;568 }569 else MPI_Comm_dup(intraComm,&interComm) ;570 }571 572 MPI_Comm_dup(intraComm,&returnComm) ;573 }574 575 576 577 void CClient::registerContext(const string& id, MPI_Comm contextComm)578 {579 int commRank, commSize ;580 MPI_Comm_rank(contextComm,&commRank) ;581 MPI_Comm_size(contextComm,&commSize) ;582 583 getPoolRessource()->createService(contextComm, id, 0, CServicesManager::CLIENT, 1) ;584 getPoolRessource()->createService(contextComm, CXios::defaultServerId, 0, CServicesManager::IO_SERVER, 1) ;585 586 if (commRank==0) while (!CXios::getServicesManager()->hasService(getPoolRessource()->getId(), id, 0)) { CXios::getDaemonsManager()->eventLoop();}587 588 if (commRank==0) CXios::getContextsManager()->createServerContext(getPoolRessource()->getId(), id, 0, id) ;589 int type=CServicesManager::CLIENT ;590 string name = CXios::getContextsManager()->getServerContextName(getPoolRessource()->getId(), id, 0, type, id) ;591 while (!CXios::getContextsManager()->hasContext(name, contextComm) )592 {593 CXios::getDaemonsManager()->eventLoop() ;594 }595 596 /*597 598 CContext::setCurrent(id) ;599 CContext* context=CContext::create(id);600 601 // register the new client side context to the contexts manager602 if (commRank==0)603 {604 MPI_Comm_rank(CXios::getXiosComm(),&commRank) ;605 SRegisterContextInfo contextInfo ;606 contextInfo.serviceType=CServicesManager::CLIENT ;607 contextInfo.partitionId=0 ;608 contextInfo.leader=commRank ;609 contextInfo.size=commSize ;610 CXios::getContextsManager()->registerContext(id, contextInfo) ;611 }612 context->initClient(contextComm) ;613 */614 }615 616 409 617 410 ///--------------------------------------------------------------- … … 623 416 * Function is only called by client. 624 417 */ 625 void CClient::registerContext_old(const string& id, MPI_Comm contextComm) 626 { 627 CContext::setCurrent(id) ; 628 CContext* context=CContext::create(id); 629 StdString idServer(id); 630 idServer += "_server"; 631 632 if (CXios::isServer && !context->hasServer) 633 // Attached mode 634 { 635 MPI_Comm contextInterComm ; 636 MPI_Comm_dup(contextComm,&contextInterComm) ; 637 CContext* contextServer = CContext::create(idServer); 638 639 // Firstly, initialize context on client side 640 context->initClient(contextComm,contextInterComm, contextServer); 641 642 // Secondly, initialize context on server side 643 contextServer->initServer(contextComm,contextInterComm, context); 644 645 // Finally, we should return current context to context client 646 CContext::setCurrent(id); 647 648 contextInterComms.push_back(contextInterComm); 649 } 650 else 651 { 652 int size,rank,globalRank ; 653 size_t message_size ; 654 int leaderRank ; 655 MPI_Comm contextInterComm ; 656 657 MPI_Comm_size(contextComm,&size) ; 658 MPI_Comm_rank(contextComm,&rank) ; 659 MPI_Comm_rank(CXios::globalComm,&globalRank) ; 660 if (rank!=0) globalRank=0 ; 661 662 CMessage msg ; 663 msg<<idServer<<size<<globalRank ; 664 // msg<<id<<size<<globalRank ; 665 666 int messageSize=msg.size() ; 667 char * buff = new char[messageSize] ; 668 CBufferOut buffer((void*)buff,messageSize) ; 669 buffer<<msg ; 670 671 MPI_Send((void*)buff,buffer.count(),MPI_CHAR,serverLeader,1,CXios::globalComm) ; 672 673 MPI_Intercomm_create(contextComm,0,CXios::globalComm,serverLeader,10+globalRank,&contextInterComm) ; 674 info(10)<<"Register new Context : "<<id<<endl ; 675 MPI_Comm inter ; 676 MPI_Intercomm_merge(contextInterComm,0,&inter) ; 677 MPI_Barrier(inter) ; 678 679 context->initClient(contextComm,contextInterComm) ; 680 681 contextInterComms.push_back(contextInterComm); 682 MPI_Comm_free(&inter); 683 delete [] buff ; 684 685 } 686 } 418 void CClient::registerContext(const string& id, MPI_Comm contextComm) 419 { 420 int commRank, commSize ; 421 MPI_Comm_rank(contextComm,&commRank) ; 422 MPI_Comm_size(contextComm,&commSize) ; 423 424 getPoolRessource()->createService(contextComm, id, 0, CServicesManager::CLIENT, 1) ; 425 getPoolRessource()->createService(contextComm, CXios::defaultServerId, 0, CServicesManager::IO_SERVER, 1) ; 426 427 if (commRank==0) while (!CXios::getServicesManager()->hasService(getPoolRessource()->getId(), id, 0)) { CXios::getDaemonsManager()->eventLoop();} 428 429 if (commRank==0) CXios::getContextsManager()->createServerContext(getPoolRessource()->getId(), id, 0, id) ; 430 int type=CServicesManager::CLIENT ; 431 string name = CXios::getContextsManager()->getServerContextName(getPoolRessource()->getId(), id, 0, type, id) ; 432 while (!CXios::getContextsManager()->hasContext(name, contextComm) ) 433 { 434 CXios::getDaemonsManager()->eventLoop() ; 435 } 436 437 } 438 439 687 440 688 441 /*! … … 759 512 760 513 761 void CClient::finalize_old(void)762 {763 int rank ;764 int msg=0 ;765 766 MPI_Comm_rank(intraComm,&rank) ;767 768 if (!CXios::isServer)769 {770 MPI_Comm_rank(intraComm,&rank) ;771 if (rank==0)772 {773 MPI_Send(&msg,1,MPI_INT,0,0,interComm) ;774 }775 }776 777 for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)778 MPI_Comm_free(&(*it));779 MPI_Comm_free(&interComm);780 MPI_Comm_free(&intraComm);781 782 CTimer::get("XIOS init/finalize",false).suspend() ;783 CTimer::get("XIOS").suspend() ;784 785 if (!is_MPI_Initialized)786 {787 if (CXios::usingOasis) oasis_finalize();788 else MPI_Finalize() ;789 }790 791 info(20) << "Client side context is finalized"<<endl ;792 report(0) <<" Performance report : Whole time from XIOS init and finalize: "<< CTimer::get("XIOS init/finalize").getCumulatedTime()<<" s"<<endl ;793 report(0) <<" Performance report : total time spent for XIOS : "<< CTimer::get("XIOS").getCumulatedTime()<<" s"<<endl ;794 report(0)<< " Performance report : time spent for waiting free buffer : "<< CTimer::get("Blocking time").getCumulatedTime()<<" s"<<endl ;795 report(0)<< " Performance report : Ratio : "<< CTimer::get("Blocking time").getCumulatedTime()/CTimer::get("XIOS init/finalize").getCumulatedTime()*100.<<" %"<<endl ;796 report(0)<< " Performance report : This ratio must be close to zero. Otherwise it may be usefull to increase buffer size or numbers of server"<<endl ;797 // report(0)<< " Memory report : Current buffer_size : "<<CXios::bufferSize<<endl ;798 report(0)<< " Memory report : Minimum buffer size required : " << CClientBuffer::maxRequestSize << " bytes" << endl ;799 report(0)<< " Memory report : increasing it by a factor will increase performance, depending of the volume of data wrote in file at each time step of the file"<<endl ;800 report(100)<<CTimer::getAllCumulatedTime()<<endl ;801 }802 803 514 /*! 804 515 * Return global rank without oasis and current rank in model intraComm in case of oasis -
XIOS/dev/dev_ym/XIOS_SERVICES/src/client.hpp
r1761 r1765 12 12 public: 13 13 static void initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm); 14 static void initialize_old(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm);15 14 static void xiosGlobalCommByFileExchange(MPI_Comm clientComm, const string& codeId) ; 16 15 static void xiosGlobalCommByPublishing(MPI_Comm clientComm, const string& codeId) ; 17 16 static void finalize(void); 18 static void finalize_old(void);19 17 static void registerContext(const string& id, MPI_Comm contextComm); 20 18 static void registerContext_old(const string& id, MPI_Comm contextComm); 21 19 static void callOasisEnddef(void) ; 22 static void initRessources(void) ;23 20 24 21 static MPI_Comm intraComm; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/context_client.cpp
r1764 r1765 305 305 buffer->checkBuffer(true); 306 306 307 /*308 if (!isAttachedModeEnabled()) // create windows only in server mode309 {310 MPI_Comm OneSidedInterComm, oneSidedComm ;311 MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &OneSidedInterComm );312 MPI_Intercomm_merge(OneSidedInterComm,false,&oneSidedComm);313 buffer->createWindows(oneSidedComm) ;314 }315 */316 307 } 317 308 … … 460 451 delete[] nbServerConnectionLocal ; 461 452 delete[] nbServerConnectionGlobal ; 462 /* 463 if (isServerLeader()) 464 { 465 CMessage msg; 466 const std::list<int>& ranks = getRanksServerLeader(); 467 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 468 { 469 info(100)<<"DEBUG : Sent context Finalize event to rank "<<*itRank<<endl ; 470 event.push(*itRank, 1, msg); 471 } 472 sendEvent(event); 473 } 474 else sendEvent(event); 475 */ 453 476 454 477 455 CTimer::get("Blocking time").resume(); … … 491 469 report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 492 470 493 //releaseBuffers(); // moved to CContext::finalize()494 471 } 495 472 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/context_server.cpp
r1764 r1765 68 68 MPI_Bcast(&hashId,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context 69 69 70 /*71 boost::hash<string> hashString;72 73 if (CServer::serverLevel == 1)74 hashId=hashString(context->getId() + boost::lexical_cast<string>(context->clientPrimServer.size()));75 else76 hashId=hashString(context->getId());77 */78 70 79 71 if (!isAttachedModeEnabled()) … … 200 192 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 201 193 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 202 /* 203 if (!isAttachedModeEnabled()) 204 { 205 MPI_Comm OneSidedInterComm, oneSidedComm ; 206 MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0, &OneSidedInterComm ); 207 MPI_Intercomm_merge(OneSidedInterComm,true,&oneSidedComm); 208 buffers[rank]->createWindows(oneSidedComm) ; 209 } 210 */ 194 211 195 lastTimeLine[rank]=0 ; 212 196 itLastTimeLine=lastTimeLine.begin() ; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/cxios.cpp
r1764 r1765 180 180 181 181 // Initialize all aspects MPI 182 // CServer::initialize();183 184 182 CServer::initialize(); 185 186 //if (CServer::getRank()==0 && CServer::serverLevel != 1) globalRegistry = new CRegistry(CServer::intraComm) ;187 188 /*189 if (printLogs2Files)190 {191 if (CServer::serverLevel == 0)192 {193 CServer::openInfoStream(serverFile);194 CServer::openErrorStream(serverFile);195 }196 else if (CServer::serverLevel == 1)197 {198 CServer::openInfoStream(serverPrmFile);199 CServer::openErrorStream(serverPrmFile);200 }201 else202 {203 CServer::openInfoStream(serverSndFile);204 CServer::openErrorStream(serverSndFile);205 }206 }207 else208 {209 CServer::openInfoStream();210 CServer::openErrorStream();211 }212 213 // Enter the loop to listen message from Client214 CServer::eventLoop();215 */216 // Finalize217 /*218 if (CServer::serverLevel == 0)219 {220 if (CServer::getRank()==0)221 {222 info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;223 globalRegistry->toFile("xios_registry.bin") ;224 delete globalRegistry ;225 }226 }227 else228 {229 // If using two server levels:230 // (1) merge registries on each pool231 // (2) send merged registries to the first pool232 // (3) merge received registries on the first pool233 if (CServer::serverLevel == 2)234 {235 vector<int>& secondaryServerGlobalRanks = CServer::getSecondaryServerGlobalRanks();236 int firstPoolGlobalRank = secondaryServerGlobalRanks[0];237 int rankGlobal;238 MPI_Comm_rank(globalComm, &rankGlobal);239 240 // Merge registries defined on each pools241 CRegistry globalRegistrySndServers (CServer::intraComm);242 243 // All pools (except the first): send globalRegistry to the first pool244 for (int i=1; i<secondaryServerGlobalRanks.size(); i++)245 {246 if (rankGlobal == secondaryServerGlobalRanks[i])247 {248 globalRegistrySndServers.mergeRegistry(*globalRegistry) ;249 int registrySize = globalRegistrySndServers.size();250 MPI_Send(®istrySize,1,MPI_LONG,firstPoolGlobalRank,15,CXios::globalComm) ;251 CBufferOut buffer(registrySize) ;252 globalRegistrySndServers.toBuffer(buffer) ;253 MPI_Send(buffer.start(),registrySize,MPI_CHAR,firstPoolGlobalRank,15,CXios::globalComm) ;254 }255 }256 257 // First pool: receive globalRegistry of all secondary server pools, merge and write the resultant registry258 if (rankGlobal == firstPoolGlobalRank)259 {260 MPI_Status status;261 char* recvBuff;262 263 globalRegistrySndServers.mergeRegistry(*globalRegistry) ;264 265 for (int i=1; i< secondaryServerGlobalRanks.size(); i++)266 {267 int rank = secondaryServerGlobalRanks[i];268 int registrySize = 0;269 MPI_Recv(®istrySize, 1, MPI_LONG, rank, 15, CXios::globalComm, &status);270 recvBuff = new char[registrySize];271 MPI_Recv(recvBuff, registrySize, MPI_CHAR, rank, 15, CXios::globalComm, &status);272 CBufferIn buffer(recvBuff, registrySize) ;273 CRegistry recvRegistry;274 recvRegistry.fromBuffer(buffer) ;275 globalRegistrySndServers.mergeRegistry(recvRegistry) ;276 delete[] recvBuff;277 }278 279 info(80)<<"Write data base Registry"<<endl<<globalRegistrySndServers.toString()<<endl ;280 globalRegistrySndServers.toFile("xios_registry.bin") ;281 282 }283 }284 delete globalRegistry;285 }286 */287 288 183 CServer::finalize(); 289 184 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/contexts_manager.cpp
r1764 r1765 101 101 else return false ; 102 102 } 103 104 /*105 auto eventScheduler=CServer::getServersRessource()->getPoolRessource()->getService(serviceId,partitionId)->getEventScheduler() ;106 std::hash<string> hashString ;107 size_t hashId = hashString(getServerContextName(poolId, serviceId, partitionId, contextId)) ;108 size_t currentTimeLine=0 ;109 CServer::eventScheduler->registerEvent(currentTimeLine,hashId);110 while (!eventScheduler->queryEvent(currentTimeLine,hashId))111 {112 CXios::getDaemonsManager()->eventLoop() ;113 }114 115 MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;116 117 if (ok)118 {119 120 MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interComm) ;121 return true ;122 }123 else return false ;124 }125 */126 127 103 128 104 void CContextsManager::sendNotification(int rank) -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/contexts_manager.hpp
r1764 r1765 31 31 32 32 bool createServerContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const string& contextId, bool wait=true) ; 33 /* bool createServerContextIntercomm(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId, 34 const MPI_Comm& intraComm, MPI_Comm& InterComm, bool wait=true) ;*/ 35 33 36 34 bool createServerContextIntercomm(const std::string& poolId, const std::string& serviceId, const int& partitionId, 37 35 const std::string& contextId, const string& sourceContext, bool wait=true) ; -
XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/server_context.cpp
r1764 r1765 90 90 MPI_Test(&req,&flag,&status) ; 91 91 } 92 // auto eventScheduler=parentService_->getEventScheduler() ; 93 // std::hash<string> hashString ; 94 // size_t hashId = hashString(name_) ; 95 // size_t currentTimeLine=0 ; 96 // eventScheduler->registerEvent(currentTimeLine,hashId); 97 // 98 // while (!eventScheduler->queryEvent(currentTimeLine,hashId)) 99 // { 100 // CXios::getDaemonsManager()->servicesEventLoop() ; 101 // eventScheduler->checkEvent() ; 102 // } 103 92 104 93 MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ; 105 94 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.cpp
r1764 r1765 269 269 270 270 271 //! Initialize client side : old interface to be removed 272 void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/) 273 TRY 274 { 275 276 hasClient = true; 277 MPI_Comm intraCommServer, interCommServer; 278 279 280 if (CServer::serverLevel != 1) 281 // initClient is called by client 282 { 283 client = new CContextClient(this, intraComm, interComm, cxtServer); 284 if (cxtServer) // Attached mode 285 { 286 intraCommServer = intraComm; 287 interCommServer = interComm; 288 } 289 else 290 { 291 MPI_Comm_dup(intraComm, &intraCommServer); 292 comms.push_back(intraCommServer); 293 MPI_Comm_dup(interComm, &interCommServer); 294 comms.push_back(interCommServer); 295 } 296 /* for registry take the id of client context */ 297 /* for servers, supress the _server_ from id */ 298 string contextRegistryId=getId() ; 299 size_t pos=contextRegistryId.find("_server_") ; 300 if (pos!=std::string::npos) contextRegistryId=contextRegistryId.substr(0,pos) ; 301 302 registryIn=new CRegistry(intraComm); 303 registryIn->setPath(contextRegistryId) ; 304 if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ; 305 registryIn->bcastRegistry() ; 306 registryOut=new CRegistry(intraComm) ; 307 308 registryOut->setPath(contextRegistryId) ; 309 310 server = new CContextServer(this, intraCommServer, interCommServer); 311 } 312 else 313 // initClient is called by primary server 314 { 315 clientPrimServer.push_back(new CContextClient(this, intraComm, interComm)); 316 MPI_Comm_dup(intraComm, &intraCommServer); 317 comms.push_back(intraCommServer); 318 MPI_Comm_dup(interComm, &interCommServer); 319 comms.push_back(interCommServer); 320 serverPrimServer.push_back(new CContextServer(this, intraCommServer, interCommServer)); 321 } 322 } 323 CATCH_DUMP_ATTR 324 325 /*! 271 /*! 326 272 Sets client buffers. 327 273 \param [in] contextClient … … 535 481 // intraComm client is not duplicated. In all the code we use client->intraComm for MPI 536 482 // in future better to replace it by intracommuncator associated to the context 537 538 /* MPI_Comm intraCommClient, intraCommServer ; 539 MPI_Comm interCommClient, interCommServer ; 540 541 intraCommClient=intraComm_ ; 542 MPI_Comm_dup(intraComm_, &intraCommServer) ; 543 544 interCommClient=interComm ; 545 MPI_Comm_dup(interComm, &interCommServer) ; */ 546 483 547 484 MPI_Comm intraCommClient, intraCommServer ; 548 485 intraCommClient=intraComm_ ; … … 570 507 for(int i=0 ; i<nbPartitions; i++) 571 508 { 572 // CXios::getContextsManager()->createServerContextIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interComm) ;573 509 parentServerContext_->createIntercomm(CXios::defaultPoolId, CXios::defaultServerId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 574 510 int type ; … … 581 517 582 518 MPI_Comm intraCommClient, intraCommServer ; 583 // MPI_Comm interCommClient, interCommServer ;584 519 585 520 intraCommClient=intraComm_ ; 586 521 MPI_Comm_dup(intraComm_, &intraCommServer) ; 587 522 588 // interCommClient=interComm ;589 // MPI_Comm_dup(interComm, &interCommServer) ;590 523 591 524 clientPrimServer.push_back(new CContextClient(this, intraCommClient, interCommClient)); … … 653 586 } 654 587 655 //! Try to send the buffers and receive possible answers 656 bool CContext::checkBuffersAndListen(bool enableEventsProcessing /*= true*/) 657 TRY 658 { 659 bool clientReady, serverFinished; 660 661 // Only classical servers are non-blocking 662 if (CServer::serverLevel == 0) 663 { 664 client->checkBuffers(); 665 return server->eventLoop(true); 666 } 667 else if (CServer::serverLevel == 1) 668 { 669 if (!finalized) client->checkBuffers(); 670 bool serverFinished = true; 671 if (!finalized) serverFinished = server->eventLoop(enableEventsProcessing); 672 bool serverPrimFinished = true; 673 for (int i = 0; i < clientPrimServer.size(); ++i) 674 { 675 if (!finalized) clientPrimServer[i]->checkBuffers(); 676 if (!finalized) serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing); 677 } 678 return ( serverFinished && serverPrimFinished); 679 } 680 681 else if (CServer::serverLevel == 2) 682 { 683 client->checkBuffers(); 684 return server->eventLoop(enableEventsProcessing); 685 } 686 } 687 CATCH_DUMP_ATTR 688 588 689 589 690 590 void CContext::globalEventLoop(void) … … 750 650 finalized = true; 751 651 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 752 }753 CATCH_DUMP_ATTR754 755 756 757 //! Terminate a context758 void CContext::finalize_old(void)759 TRY760 {761 int countChildCtx_ ; // ym temporary762 763 if (hasClient && !hasServer) // For now we only use server level 1 to read data764 {765 doPreTimestepOperationsForEnabledReadModeFiles();766 }767 // Send registry upon calling the function the first time768 if (countChildCtx_ == 0) if (hasClient) sendRegistry() ;769 770 // Client:771 // (1) blocking send context finalize to its server772 // (2) blocking receive context finalize from its server773 // (3) some memory deallocations774 if (CXios::isClient)775 {776 // Make sure that client (model) enters the loop only once777 if (countChildCtx_ < 1)778 {779 ++countChildCtx_;780 781 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ;782 client->finalize();783 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ;784 while (client->havePendingRequests()) client->checkBuffers();785 786 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ;787 while (!server->hasFinished())788 server->eventLoop();789 info(100)<<"DEBUG: context "<<getId()<<" server has finished"<<endl ;790 791 bool notifiedFinalized=false ;792 do793 {794 notifiedFinalized=client->isNotifiedFinalized() ;795 } while (!notifiedFinalized) ;796 client->releaseBuffers();797 798 if (hasServer) // Mode attache799 {800 closeAllFile();801 registryOut->hierarchicalGatherRegistry() ;802 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;803 }804 805 //! Deallocate client buffers806 // client->releaseBuffers();807 info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ;808 //! Free internally allocated communicators809 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)810 MPI_Comm_free(&(*it));811 comms.clear();812 813 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl;814 }815 }816 else if (CXios::isServer)817 {818 // First context finalize message received from a model819 // Send context finalize to its child contexts (if any)820 if (countChildCtx_ == 0)821 for (int i = 0; i < clientPrimServer.size(); ++i)822 {823 clientPrimServer[i]->finalize();824 bool bufferReleased;825 do826 {827 clientPrimServer[i]->checkBuffers();828 bufferReleased = !clientPrimServer[i]->havePendingRequests();829 } while (!bufferReleased);830 831 bool notifiedFinalized=false ;832 do833 {834 // clientPrimServer[i]->checkBuffers();835 notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ;836 } while (!notifiedFinalized) ;837 clientPrimServer[i]->releaseBuffers();838 }839 840 841 // (Last) context finalized message received842 if (countChildCtx_ == clientPrimServer.size())843 {844 // Blocking send of context finalize message to its client (e.g. primary server or model)845 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ;846 client->finalize();847 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ;848 bool bufferReleased;849 do850 {851 client->checkBuffers();852 bufferReleased = !client->havePendingRequests();853 } while (!bufferReleased);854 855 bool notifiedFinalized=false ;856 do857 {858 // client->checkBuffers();859 notifiedFinalized=client->isNotifiedFinalized() ;860 } while (!notifiedFinalized) ;861 client->releaseBuffers();862 863 finalized = true;864 info(100)<<"DEBUG: context "<<getId()<<" bufferRelease OK"<<endl ;865 866 closeAllFile(); // Just move to here to make sure that server-level 1 can close files867 868 /* ym869 if (hasServer && !hasClient)870 {871 registryOut->hierarchicalGatherRegistry() ;872 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ;873 }874 */875 876 //! Deallocate client buffers877 // client->releaseBuffers();878 info(100)<<"DEBUG: context "<<getId()<<" client release"<<endl ;879 880 /*881 for (int i = 0; i < clientPrimServer.size(); ++i)882 clientPrimServer[i]->releaseBuffers();883 */884 //! Free internally allocated communicators885 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)886 MPI_Comm_free(&(*it));887 comms.clear();888 889 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl;890 }891 892 ++countChildCtx_;893 }894 652 } 895 653 CATCH_DUMP_ATTR -
XIOS/dev/dev_ym/XIOS_SERVICES/src/node/context.hpp
r1764 r1765 91 91 public : 92 92 // Initialize server or client 93 void initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer = 0);94 93 void init(CServerContext* parentServerContext, MPI_Comm intraComm, int serviceType); 95 94 void initClient(MPI_Comm intraComm, int serviceType); … … 105 104 106 105 // Put sever or client into loop state 107 bool checkBuffersAndListen(bool enableEventsProcessing=true);108 106 bool eventLoop(bool enableEventsProcessing=true); 109 107 void globalEventLoop(void); … … 112 110 void finalize(void); 113 111 114 void finalize_old(void);115 112 bool isFinalized(void); 116 113 -
XIOS/dev/dev_ym/XIOS_SERVICES/src/server.cpp
r1764 r1765 44 44 CServersRessource* CServer::serversRessource_=nullptr ; 45 45 46 void CServer::initRessources(void) 47 { 48 auto ressourcesManager=CXios::getRessourcesManager() ; 49 auto servicesManager=CXios::getServicesManager() ; 50 auto contextsManager=CXios::getContextsManager() ; 51 auto daemonsManager=CXios::getDaemonsManager() ; 52 auto serversRessource=CServer::getServersRessource() ; 53 54 if (serversRessource->isServerLeader()) 55 { 56 // ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()/2) ; 57 // ressourcesManager->createPool("NEMO",ressourcesManager->getRessourcesSize()/2) ; 58 ressourcesManager->createPool("LMDZ",ressourcesManager->getRessourcesSize()) ; 59 servicesManager->createServices("LMDZ", "ioserver", CServicesManager::IO_SERVER, 8, 5) ; 60 for(int i=0 ; i<5;i++) 61 { 62 contextsManager->createServerContext("LMDZ","ioserver",i,"lmdz") ; 63 } 64 } 65 66 67 68 while (true) 69 { 70 daemonsManager->eventLoop() ; 71 } 72 73 74 } 75 46 76 47 void CServer::initialize(void) 77 48 { … … 318 289 } 319 290 320 //---------------------------------------------------------------321 /*!322 * \fn void CServer::initialize(void)323 * Creates intraComm for each possible type of servers (classical, primary or secondary).324 * Creates interComm and stores them into the following lists:325 * classical server -- interCommLeft326 * primary server -- interCommLeft and interCommRight327 * secondary server -- interCommLeft for each pool.328 * IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead.329 */330 void CServer::initialize_old(void)331 {332 int initialized ;333 MPI_Initialized(&initialized) ;334 if (initialized) is_MPI_Initialized=true ;335 else is_MPI_Initialized=false ;336 int rank ;337 338 CXios::launchRessourcesManager(true) ;339 CXios::launchServicesManager(true) ;340 CXios::launchContextsManager(true) ;341 342 initRessources() ;343 // Not using OASIS344 if (!CXios::usingOasis)345 {346 347 if (!is_MPI_Initialized)348 {349 MPI_Init(NULL, NULL);350 }351 CTimer::get("XIOS").resume() ;352 353 boost::hash<string> hashString ;354 unsigned long hashServer = hashString(CXios::xiosCodeId);355 356 unsigned long* hashAll ;357 unsigned long* srvLevelAll ;358 359 int size ;360 int myColor ;361 int i,c ;362 MPI_Comm newComm;363 364 MPI_Comm_size(CXios::globalComm, &size) ;365 MPI_Comm_rank(CXios::globalComm, &rank_);366 367 hashAll=new unsigned long[size] ;368 MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ;369 370 map<unsigned long, int> colors ;371 map<unsigned long, int> leaders ;372 map<unsigned long, int>::iterator it ;373 374 // (1) Establish client leaders, distribute processes between two server levels375 std::vector<int> srvRanks;376 for(i=0,c=0;i<size;i++)377 {378 if (colors.find(hashAll[i])==colors.end())379 {380 colors[hashAll[i]]=c ;381 leaders[hashAll[i]]=i ;382 c++ ;383 }384 if (CXios::usingServer2)385 if (hashAll[i] == hashServer)386 srvRanks.push_back(i);387 }388 389 if (CXios::usingServer2)390 {391 int reqNbProc = srvRanks.size()*CXios::ratioServer2/100.;392 if (reqNbProc<1 || reqNbProc==srvRanks.size())393 {394 error(0)<<"WARNING: void CServer::initialize(void)"<<endl395 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc396 <<" to secondary server. XIOS will run in the classical server mode."<<endl;397 }398 else399 {400 if (CXios::nbPoolsServer2 == 0) CXios::nbPoolsServer2 = reqNbProc;401 int firstSndSrvRank = srvRanks.size()*(100.-CXios::ratioServer2)/100. ;402 int poolLeader = firstSndSrvRank;403 //*********** (1) Comment out the line below to set one process per pool404 sndServerGlobalRanks.push_back(srvRanks[poolLeader]);405 int nbPools = CXios::nbPoolsServer2;406 if ( nbPools > reqNbProc || nbPools < 1)407 {408 error(0)<<"WARNING: void CServer::initialize(void)"<<endl409 << "It is impossible to allocate the requested number of pools = "<<nbPools410 <<" on the secondary server. It will be set so that there is one process per pool."<<endl;411 nbPools = reqNbProc;412 }413 int remainder = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) % nbPools;414 int procsPerPool = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) / nbPools;415 for (i=0; i<srvRanks.size(); i++)416 {417 if (i >= firstSndSrvRank)418 {419 if (rank_ == srvRanks[i])420 {421 serverLevel=2;422 }423 poolLeader += procsPerPool;424 if (remainder != 0)425 {426 ++poolLeader;427 --remainder;428 }429 //*********** (2) Comment out the two lines below to set one process per pool430 if (poolLeader < srvRanks.size())431 sndServerGlobalRanks.push_back(srvRanks[poolLeader]);432 //*********** (3) Uncomment the line below to set one process per pool433 // sndServerGlobalRanks.push_back(srvRanks[i]);434 }435 else436 {437 if (rank_ == srvRanks[i]) serverLevel=1;438 }439 }440 if (serverLevel==2)441 {442 info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ;443 for (i=0; i<sndServerGlobalRanks.size(); i++)444 {445 if (rank_>= sndServerGlobalRanks[i])446 {447 if ( i == sndServerGlobalRanks.size()-1)448 {449 myColor = colors.size() + sndServerGlobalRanks[i];450 }451 else if (rank_< sndServerGlobalRanks[i+1])452 {453 myColor = colors.size() + sndServerGlobalRanks[i];454 break;455 }456 }457 }458 }459 }460 }461 462 // (2) Create intraComm463 if (serverLevel != 2) myColor=colors[hashServer];464 MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ;465 466 // (3) Create interComm467 if (serverLevel == 0)468 {469 int clientLeader;470 for(it=leaders.begin();it!=leaders.end();it++)471 {472 if (it->first!=hashServer)473 {474 clientLeader=it->second ;475 int intraCommSize, intraCommRank ;476 MPI_Comm_size(intraComm,&intraCommSize) ;477 MPI_Comm_rank(intraComm,&intraCommRank) ;478 info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize479 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ;480 481 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;482 interCommLeft.push_back(newComm) ;483 }484 }485 }486 else if (serverLevel == 1)487 {488 int clientLeader, srvSndLeader;489 int srvPrmLeader ;490 491 for (it=leaders.begin();it!=leaders.end();it++)492 {493 if (it->first != hashServer)494 {495 clientLeader=it->second ;496 int intraCommSize, intraCommRank ;497 MPI_Comm_size(intraComm, &intraCommSize) ;498 MPI_Comm_rank(intraComm, &intraCommRank) ;499 info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize500 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ;501 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;502 interCommLeft.push_back(newComm) ;503 }504 }505 506 for (int i = 0; i < sndServerGlobalRanks.size(); ++i)507 {508 int intraCommSize, intraCommRank ;509 MPI_Comm_size(intraComm, &intraCommSize) ;510 MPI_Comm_rank(intraComm, &intraCommRank) ;511 info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize512 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< sndServerGlobalRanks[i]<<endl ;513 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, sndServerGlobalRanks[i], 1, &newComm) ;514 interCommRight.push_back(newComm) ;515 }516 }517 else518 {519 int clientLeader;520 clientLeader = leaders[hashString(CXios::xiosCodeId)];521 int intraCommSize, intraCommRank ;522 MPI_Comm_size(intraComm, &intraCommSize) ;523 MPI_Comm_rank(intraComm, &intraCommRank) ;524 info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize525 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ;526 527 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ;528 interCommLeft.push_back(newComm) ;529 }530 531 delete [] hashAll ;532 533 }534 // using OASIS535 else536 {537 int size;538 int myColor;539 int* srvGlobalRanks;540 if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);541 542 CTimer::get("XIOS").resume() ;543 MPI_Comm localComm;544 oasis_get_localcomm(localComm);545 MPI_Comm_rank(localComm,&rank_) ;546 547 // (1) Create server intraComm548 if (!CXios::usingServer2)549 {550 MPI_Comm_dup(localComm, &intraComm);551 }552 else553 {554 int globalRank;555 MPI_Comm_size(localComm,&size) ;556 MPI_Comm_rank(CXios::globalComm,&globalRank) ;557 srvGlobalRanks = new int[size] ;558 MPI_Allgather(&globalRank, 1, MPI_INT, srvGlobalRanks, 1, MPI_INT, localComm) ;559 560 int reqNbProc = size*CXios::ratioServer2/100.;561 if (reqNbProc < 1 || reqNbProc == size)562 {563 error(0)<<"WARNING: void CServer::initialize(void)"<<endl564 << "It is impossible to dedicate the requested number of processes = "<<reqNbProc565 <<" to secondary server. XIOS will run in the classical server mode."<<endl;566 MPI_Comm_dup(localComm, &intraComm);567 }568 else569 {570 int firstSndSrvRank = size*(100.-CXios::ratioServer2)/100. ;571 int poolLeader = firstSndSrvRank;572 //*********** (1) Comment out the line below to set one process per pool573 // sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]);574 int nbPools = CXios::nbPoolsServer2;575 if ( nbPools > reqNbProc || nbPools < 1)576 {577 error(0)<<"WARNING: void CServer::initialize(void)"<<endl578 << "It is impossible to allocate the requested number of pools = "<<nbPools579 <<" on the secondary server. It will be set so that there is one process per pool."<<endl;580 nbPools = reqNbProc;581 }582 int remainder = ((int) (size*CXios::ratioServer2/100.)) % nbPools;583 int procsPerPool = ((int) (size*CXios::ratioServer2/100.)) / nbPools;584 for (int i=0; i<size; i++)585 {586 if (i >= firstSndSrvRank)587 {588 if (globalRank == srvGlobalRanks[i])589 {590 serverLevel=2;591 }592 poolLeader += procsPerPool;593 if (remainder != 0)594 {595 ++poolLeader;596 --remainder;597 }598 //*********** (2) Comment out the two lines below to set one process per pool599 // if (poolLeader < size)600 // sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]);601 //*********** (3) Uncomment the line below to set one process per pool602 sndServerGlobalRanks.push_back(srvGlobalRanks[i]);603 }604 else605 {606 if (globalRank == srvGlobalRanks[i]) serverLevel=1;607 }608 }609 if (serverLevel==2)610 {611 info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ;612 for (int i=0; i<sndServerGlobalRanks.size(); i++)613 {614 if (globalRank>= sndServerGlobalRanks[i])615 {616 if (i == sndServerGlobalRanks.size()-1)617 {618 myColor = sndServerGlobalRanks[i];619 }620 else if (globalRank< sndServerGlobalRanks[i+1])621 {622 myColor = sndServerGlobalRanks[i];623 break;624 }625 }626 }627 }628 if (serverLevel != 2) myColor=0;629 MPI_Comm_split(localComm, myColor, rank_, &intraComm) ;630 }631 }632 633 string codesId=CXios::getin<string>("oasis_codes_id") ;634 vector<string> oasisCodeId=splitRegex(codesId,"\\s*,\\s*") ;635 636 vector<string>::iterator it ;637 638 MPI_Comm newComm ;639 int globalRank ;640 MPI_Comm_rank(CXios::globalComm,&globalRank);641 642 // (2) Create interComms with models643 for(it=oasisCodeId.begin();it!=oasisCodeId.end();it++)644 {645 oasis_get_intercomm(newComm,*it) ;646 if ( serverLevel == 0 || serverLevel == 1)647 {648 interCommLeft.push_back(newComm) ;649 if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;650 }651 }652 653 // (3) Create interComms between primary and secondary servers654 int intraCommSize, intraCommRank ;655 MPI_Comm_size(intraComm,&intraCommSize) ;656 MPI_Comm_rank(intraComm, &intraCommRank) ;657 658 if (serverLevel == 1)659 {660 for (int i = 0; i < sndServerGlobalRanks.size(); ++i)661 {662 int srvSndLeader = sndServerGlobalRanks[i];663 info(50)<<"intercommCreate::client (server level 1) "<<globalRank<<" intraCommSize : "<<intraCommSize664 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvSndLeader<<endl ;665 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ;666 interCommRight.push_back(newComm) ;667 }668 }669 else if (serverLevel == 2)670 {671 info(50)<<"intercommCreate::server (server level 2)"<<globalRank<<" intraCommSize : "<<intraCommSize672 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< srvGlobalRanks[0] <<endl ;673 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvGlobalRanks[0], 0, &newComm) ;674 interCommLeft.push_back(newComm) ;675 }676 if (CXios::usingServer2) delete [] srvGlobalRanks ;677 678 bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ;679 if (!oasisEnddef) oasis_enddef() ;680 }681 682 683 MPI_Comm_rank(intraComm, &rank) ;684 if (rank==0) isRoot=true;685 else isRoot=false;686 687 eventScheduler = new CEventScheduler(intraComm) ;688 }689 291 690 292 void CServer::finalize(void) … … 699 301 for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++) 700 302 MPI_Comm_free(&(*it)); 701 702 // for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)703 // MPI_Comm_free(&(*it));704 705 // for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++)706 // MPI_Comm_free(&(*it));707 303 708 304 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) … … 723 319 report(100)<<CTimer::getAllCumulatedTime()<<endl ; 724 320 } 725 726 void CServer::eventLoop(void)727 {728 bool stop=false ;729 730 CTimer::get("XIOS server").resume() ;731 while(!stop)732 {733 if (isRoot)734 {735 listenContext();736 listenRootContext();737 listenOasisEnddef() ;738 listenRootOasisEnddef() ;739 if (!finished) listenFinalize() ;740 }741 else742 {743 listenRootContext();744 listenRootOasisEnddef() ;745 if (!finished) listenRootFinalize() ;746 }747 748 contextEventLoop() ;749 if (finished && contextList.empty()) stop=true ;750 eventScheduler->checkEvent() ;751 }752 CTimer::get("XIOS server").suspend() ;753 }754 755 void CServer::listenFinalize(void)756 {757 list<MPI_Comm>::iterator it, itr;758 int msg ;759 int flag ;760 761 for(it=interCommLeft.begin();it!=interCommLeft.end();it++)762 {763 MPI_Status status ;764 traceOff() ;765 MPI_Iprobe(0,0,*it,&flag,&status) ;766 traceOn() ;767 if (flag==true)768 {769 MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;770 info(20)<<" CServer : Receive client finalize"<<endl ;771 // Sending server finalize message to secondary servers (if any)772 for(itr=interCommRight.begin();itr!=interCommRight.end();itr++)773 {774 MPI_Send(&msg,1,MPI_INT,0,0,*itr) ;775 }776 MPI_Comm_free(&(*it));777 interCommLeft.erase(it) ;778 break ;779 }780 }781 782 if (interCommLeft.empty())783 {784 int i,size ;785 MPI_Comm_size(intraComm,&size) ;786 MPI_Request* requests= new MPI_Request[size-1] ;787 MPI_Status* status= new MPI_Status[size-1] ;788 789 for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;790 MPI_Waitall(size-1,requests,status) ;791 792 finished=true ;793 delete [] requests ;794 delete [] status ;795 }796 }797 798 799 void CServer::listenRootFinalize()800 {801 int flag ;802 MPI_Status status ;803 int msg ;804 805 traceOff() ;806 MPI_Iprobe(0,4,intraComm, &flag, &status) ;807 traceOn() ;808 if (flag==true)809 {810 MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;811 finished=true ;812 }813 }814 815 816 /*!817 * Root process is listening for an order sent by client to call "oasis_enddef".818 * The root client of a compound send the order (tag 5). It is probed and received.819 * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).820 * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done821 */822 823 void CServer::listenOasisEnddef(void)824 {825 int flag ;826 MPI_Status status ;827 list<MPI_Comm>::iterator it;828 int msg ;829 static int nbCompound=0 ;830 int size ;831 static bool sent=false ;832 static MPI_Request* allRequests ;833 static MPI_Status* allStatus ;834 835 836 if (sent)837 {838 MPI_Comm_size(intraComm,&size) ;839 MPI_Testall(size,allRequests, &flag, allStatus) ;840 if (flag==true)841 {842 delete [] allRequests ;843 delete [] allStatus ;844 sent=false ;845 }846 }847 848 849 for(it=interCommLeft.begin();it!=interCommLeft.end();it++)850 {851 MPI_Status status ;852 traceOff() ;853 MPI_Iprobe(0,5,*it,&flag,&status) ; // tags oasis_endded = 5854 traceOn() ;855 if (flag==true)856 {857 MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5858 nbCompound++ ;859 if (nbCompound==interCommLeft.size())860 {861 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)862 {863 MPI_Send(&msg,1,MPI_INT,0,5,*it) ; // tags oasis_endded = 5864 }865 MPI_Comm_size(intraComm,&size) ;866 allRequests= new MPI_Request[size] ;867 allStatus= new MPI_Status[size] ;868 for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm,&allRequests[i]) ; // tags oasis_endded = 5869 sent=true ;870 }871 }872 }873 }874 875 /*!876 * Processes probes message from root process if oasis_enddef call must be done.877 * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator878 */879 void CServer::listenRootOasisEnddef(void)880 {881 int flag ;882 MPI_Status status ;883 const int root=0 ;884 int msg ;885 static bool eventSent=false ;886 887 if (eventSent)888 {889 boost::hash<string> hashString;890 size_t hashId = hashString("oasis_enddef");891 if (eventScheduler->queryEvent(0,hashId))892 {893 oasis_enddef() ;894 eventSent=false ;895 }896 }897 898 traceOff() ;899 MPI_Iprobe(root,5,intraComm, &flag, &status) ;900 traceOn() ;901 if (flag==true)902 {903 MPI_Recv(&msg,1,MPI_INT,root,5,intraComm,&status) ; // tags oasis_endded = 5904 boost::hash<string> hashString;905 size_t hashId = hashString("oasis_enddef");906 eventScheduler->registerEvent(0,hashId);907 eventSent=true ;908 }909 }910 911 912 913 914 915 void CServer::listenContext(void)916 {917 918 MPI_Status status ;919 int flag ;920 static char* buffer ;921 static MPI_Request request ;922 static bool recept=false ;923 int rank ;924 int count ;925 926 if (recept==false)927 {928 traceOff() ;929 MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;930 traceOn() ;931 if (flag==true)932 {933 rank=status.MPI_SOURCE ;934 MPI_Get_count(&status,MPI_CHAR,&count) ;935 buffer=new char[count] ;936 MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;937 recept=true ;938 }939 }940 else941 {942 traceOff() ;943 MPI_Test(&request,&flag,&status) ;944 traceOn() ;945 if (flag==true)946 {947 rank=status.MPI_SOURCE ;948 MPI_Get_count(&status,MPI_CHAR,&count) ;949 recvContextMessage((void*)buffer,count) ;950 delete [] buffer ;951 recept=false ;952 }953 }954 }955 956 void CServer::recvContextMessage(void* buff,int count)957 {958 static map<string,contextMessage> recvContextId;959 map<string,contextMessage>::iterator it ;960 CBufferIn buffer(buff,count) ;961 string id ;962 int clientLeader ;963 int nbMessage ;964 965 buffer>>id>>nbMessage>>clientLeader ;966 967 it=recvContextId.find(id) ;968 if (it==recvContextId.end())969 {970 contextMessage msg={0,0} ;971 pair<map<string,contextMessage>::iterator,bool> ret ;972 ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;973 it=ret.first ;974 }975 it->second.nbRecv+=1 ;976 it->second.leaderRank+=clientLeader ;977 978 if (it->second.nbRecv==nbMessage)979 {980 int size ;981 MPI_Comm_size(intraComm,&size) ;982 // MPI_Request* requests= new MPI_Request[size-1] ;983 // MPI_Status* status= new MPI_Status[size-1] ;984 MPI_Request* requests= new MPI_Request[size] ;985 MPI_Status* status= new MPI_Status[size] ;986 987 CMessage msg ;988 msg<<id<<it->second.leaderRank;989 int messageSize=msg.size() ;990 void * sendBuff = new char[messageSize] ;991 CBufferOut sendBuffer(sendBuff,messageSize) ;992 sendBuffer<<msg ;993 994 // Include root itself in order not to have a divergence995 for(int i=0; i<size; i++)996 {997 MPI_Isend(sendBuff,sendBuffer.count(),MPI_CHAR,i,2,intraComm,&requests[i]) ;998 }999 1000 recvContextId.erase(it) ;1001 delete [] requests ;1002 delete [] status ;1003 1004 }1005 }1006 1007 void CServer::listenRootContext(void)1008 {1009 MPI_Status status ;1010 int flag ;1011 static std::vector<void*> buffers;1012 static std::vector<MPI_Request> requests ;1013 static std::vector<int> counts ;1014 static std::vector<bool> isEventRegistered ;1015 static std::vector<bool> isEventQueued ;1016 MPI_Request request;1017 1018 int rank ;1019 const int root=0 ;1020 boost::hash<string> hashString;1021 size_t hashId = hashString("RegisterContext");1022 1023 // (1) Receive context id from the root, save it into a buffer1024 traceOff() ;1025 MPI_Iprobe(root,2,intraComm, &flag, &status) ;1026 traceOn() ;1027 if (flag==true)1028 {1029 counts.push_back(0);1030 MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ;1031 buffers.push_back(new char[counts.back()]) ;1032 requests.push_back(request);1033 MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&(requests.back())) ;1034 isEventRegistered.push_back(false);1035 isEventQueued.push_back(false);1036 nbContexts++;1037 }1038 1039 for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ )1040 {1041 // (2) If context id is received, register an event1042 MPI_Test(&requests[ctxNb],&flag,&status) ;1043 if (flag==true && !isEventRegistered[ctxNb])1044 {1045 eventScheduler->registerEvent(ctxNb,hashId);1046 isEventRegistered[ctxNb] = true;1047 }1048 // (3) If event has been scheduled, call register context1049 if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb])1050 {1051 registerContext(buffers[ctxNb],counts[ctxNb]) ;1052 isEventQueued[ctxNb] = true;1053 delete [] buffers[ctxNb] ;1054 }1055 }1056 1057 }1058 1059 void CServer::registerContext(void* buff, int count, int leaderRank)1060 {1061 string contextId;1062 CBufferIn buffer(buff, count);1063 // buffer >> contextId;1064 buffer >> contextId>>leaderRank;1065 CContext* context;1066 1067 info(20) << "CServer : Register new Context : " << contextId << endl;1068 1069 if (contextList.find(contextId) != contextList.end())1070 ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",1071 << "Context '" << contextId << "' has already been registred");1072 1073 context=CContext::create(contextId);1074 contextList[contextId]=context;1075 1076 // Primary or classical server: create communication channel with a client1077 // (1) create interComm (with a client)1078 // (2) initialize client and server (contextClient and contextServer)1079 MPI_Comm inter;1080 if (serverLevel < 2)1081 {1082 MPI_Comm contextInterComm;1083 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm);1084 MPI_Intercomm_merge(contextInterComm,1,&inter);1085 MPI_Barrier(inter);1086 MPI_Comm_free(&inter);1087 context->initServer(intraComm,contextInterComm);1088 contextInterComms.push_back(contextInterComm);1089 1090 }1091 // Secondary server: create communication channel with a primary server1092 // (1) duplicate interComm with a primary server1093 // (2) initialize client and server (contextClient and contextServer)1094 // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create,1095 // because interComm of CContext is defined on the same processes as the interComm of CServer.1096 // So just duplicate it.1097 else if (serverLevel == 2)1098 {1099 MPI_Comm_dup(interCommLeft.front(), &inter);1100 contextInterComms.push_back(inter);1101 context->initServer(intraComm, contextInterComms.back());1102 }1103 1104 // Primary server:1105 // (1) send create context message to secondary servers1106 // (2) initialize communication channels with secondary servers (create contextClient and contextServer)1107 if (serverLevel == 1)1108 {1109 int i = 0, size;1110 MPI_Comm_size(intraComm, &size) ;1111 for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i)1112 {1113 StdString str = contextId +"_server_" + boost::lexical_cast<string>(i);1114 CMessage msg;1115 int messageSize;1116 msg<<str<<size<<rank_ ;1117 messageSize = msg.size() ;1118 buff = new char[messageSize] ;1119 CBufferOut buffer(buff,messageSize) ;1120 buffer<<msg ;1121 MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGlobalRanks[i], 1, CXios::globalComm) ;1122 MPI_Comm_dup(*it, &inter);1123 contextInterComms.push_back(inter);1124 MPI_Comm_dup(intraComm, &inter);1125 contextIntraComms.push_back(inter);1126 context->initClient(contextIntraComms.back(), contextInterComms.back()) ;1127 delete [] buff ;1128 }1129 }1130 }1131 1132 void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/)1133 {1134 bool isFinalized ;1135 map<string,CContext*>::iterator it ;1136 1137 for(it=contextList.begin();it!=contextList.end();it++)1138 {1139 isFinalized=it->second->isFinalized();1140 if (isFinalized)1141 {1142 contextList.erase(it) ;1143 break ;1144 }1145 else1146 it->second->eventLoop(enableEventsProcessing);1147 //ym it->second->checkBuffersAndListen(enableEventsProcessing);1148 }1149 }1150 1151 //! Get rank of the current process in the intraComm1152 int CServer::getRank()1153 {1154 int rank;1155 MPI_Comm_rank(intraComm,&rank);1156 return rank;1157 }1158 1159 vector<int>& CServer::getSecondaryServerGlobalRanks()1160 {1161 return sndServerGlobalRanks;1162 }1163 321 1164 322 /*! -
XIOS/dev/dev_ym/XIOS_SERVICES/src/server.hpp
r1761 r1765 16 16 public: 17 17 static void initialize(void); 18 static void initialize_old(void);19 18 static void xiosGlobalCommByFileExchange(MPI_Comm serverComm) ; 20 19 static void xiosGlobalCommByPublishing(MPI_Comm serverComm) ; … … 22 21 static void finalize(void); 23 22 static void eventLoop(void); 24 static void contextEventLoop(bool enableEventsProcessing=true); 25 static void listenContext(void); 26 static void listenFinalize(void); 27 static void recvContextMessage(void* buff,int count); 28 static void listenRootContext(void); 29 static void listenRootFinalize(void); 30 static void listenRootOasisEnddef(void); 31 static void listenOasisEnddef(void); 32 static void registerContext(void* buff,int count, int leaderRank=0); 33 static void initRessources() ; 34 23 35 24 static MPI_Comm intraComm; 36 25 static MPI_Comm serversComm_;
Note: See TracChangeset
for help on using the changeset viewer.