Changeset 1148
- Timestamp:
- 05/31/17 10:36:33 (8 years ago)
- Location:
- XIOS/dev/dev_olga/src
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_olga/src/client.cpp
r1130 r1148 18 18 MPI_Comm CClient::interComm ; 19 19 std::list<MPI_Comm> CClient::contextInterComms; 20 vector <int>CClient::serverLeader;20 int CClient::serverLeader; 21 21 bool CClient::is_MPI_Initialized ; 22 int CClient::rank = INVALID_RANK;22 int CClient::rank_ = INVALID_RANK; 23 23 StdOFStream CClient::m_infoStream; 24 24 StdOFStream CClient::m_errorStream; … … 66 66 67 67 MPI_Comm_size(CXios::globalComm,&size) ; 68 MPI_Comm_rank(CXios::globalComm,&rank );68 MPI_Comm_rank(CXios::globalComm,&rank_); 69 69 70 70 hashAll=new unsigned long[size] ; … … 99 99 100 100 myColor=colors[hashClient] ; 101 MPI_Comm_split(CXios::globalComm,myColor,rank ,&intraComm) ;101 MPI_Comm_split(CXios::globalComm,myColor,rank_,&intraComm) ; 102 102 103 103 if (CXios::usingServer) 104 104 { 105 105 int clientLeader=leaders[hashClient] ; 106 serverLeader .push_back(leaders[hashServer]);106 serverLeader=leaders[hashServer] ; 107 107 int intraCommSize, intraCommRank ; 108 108 MPI_Comm_size(intraComm,&intraCommSize) ; 109 109 MPI_Comm_rank(intraComm,&intraCommRank) ; 110 info(50)<<"intercommCreate::client "<<rank<<" intraCommSize : "<<intraCommSize 111 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< serverLeader.back()<<endl ; 112 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, serverLeader.back(), 0, &interComm) ; 110 info(50)<<"intercommCreate::client "<<rank_<<" intraCommSize : "<<intraCommSize 111 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< serverLeader<<endl ; 112 MPI_Intercomm_create(intraComm, 0, CXios::globalComm, serverLeader, 0, &interComm) ; 113 rank_ = intraCommRank; 113 114 } 114 115 else … … 149 150 { 150 151 MPI_Status status ; 151 MPI_Comm_rank(intraComm,&rank ) ;152 MPI_Comm_rank(intraComm,&rank_) ; 152 153 153 154 oasis_get_intercomm(interComm,CXios::xiosCodeId) ; 154 if (rank ==0) MPI_Recv(&serverLeader,1, MPI_INT, 0, 0, interComm, &status) ;155 if (rank_==0) MPI_Recv(&serverLeader,1, MPI_INT, 0, 0, interComm, &status) ; 155 156 MPI_Bcast(&serverLeader,1,MPI_INT,0,intraComm) ; 156 157 … … 216 217 buffer<<msg ; 217 218 218 for (int i = 0; i < serverLeader.size(); ++i) 219 { 220 MPI_Send(buff, buffer.count(), MPI_CHAR, serverLeader[i], 1, CXios::globalComm) ; 221 MPI_Intercomm_create(contextComm, 0, CXios::globalComm, serverLeader[i], 10+globalRank, &contextInterComm) ; 222 info(10)<<"Register new Context : "<<id<<endl ; 223 MPI_Comm inter ; 224 MPI_Intercomm_merge(contextInterComm,0,&inter) ; 225 MPI_Barrier(inter) ; 226 227 context->initClient(contextComm,contextInterComm) ; 228 229 contextInterComms.push_back(contextInterComm); 230 MPI_Comm_free(&inter); 231 } 219 MPI_Send(buff, buffer.count(), MPI_CHAR, serverLeader, 1, CXios::globalComm) ; 220 MPI_Intercomm_create(contextComm, 0, CXios::globalComm, serverLeader, 10+globalRank, &contextInterComm) ; 221 info(10)<<"Register new Context : "<<id<<endl ; 222 MPI_Comm inter ; 223 MPI_Intercomm_merge(contextInterComm,0,&inter) ; 224 MPI_Barrier(inter) ; 225 226 context->initClient(contextComm,contextInterComm) ; 227 228 contextInterComms.push_back(contextInterComm); 229 MPI_Comm_free(&inter); 232 230 delete [] buff ; 233 231 … … 276 274 } 277 275 278 276 /*! 277 * Return rank in model intraComm 278 */ 279 279 int CClient::getRank() 280 280 { 281 return rank ;281 return rank_; 282 282 } 283 283 -
XIOS/dev/dev_olga/src/client.hpp
r1130 r1148 18 18 static MPI_Comm interComm; 19 19 static std::list<MPI_Comm> contextInterComms; 20 static vector<int>serverLeader;20 static int serverLeader; 21 21 static bool is_MPI_Initialized ; 22 22 … … 41 41 42 42 protected: 43 static int rank ;43 static int rank_; //!< Rank in model intraComm 44 44 static StdOFStream m_infoStream; 45 45 static StdOFStream m_errorStream; -
XIOS/dev/dev_olga/src/context_client.hpp
r1139 r1148 105 105 std::list<int> ranksServerNotLeader; 106 106 107 public: // Some function should be removed in the future108 // void registerEvent(CEventClient& event);109 // list<CBufferOut*> newEvent(CEventClient& event,list<int>& sizes);110 // bool locked;111 // set<int> connectedServer;112 113 107 }; 114 108 } -
XIOS/dev/dev_olga/src/server.cpp
r1142 r1148 28 28 int CServer::nbPools = 0; 29 29 int CServer::poolId = 0; 30 int CServer::nbContexts _= 0;30 int CServer::nbContexts = 0; 31 31 bool CServer::isRoot = false ; 32 32 int CServer::rank_ = INVALID_RANK; … … 44 44 * In case of secondary servers intraComm is created for each secondary server pool. 45 45 * (For now the assumption is that there is one proc per pool.) 46 * Creates the following lists of interComms:46 * Creates interComm and stores them into the following lists: 47 47 * classical server -- interCommLeft 48 48 * primary server -- interCommLeft and interCommRight 49 * secondary server -- interComm for each pool.49 * secondary server -- interCommLeft for each pool. 50 50 */ 51 51 void CServer::initialize(void) … … 85 85 map<unsigned long, int> colors ; 86 86 map<unsigned long, int> leaders ; 87 map<unsigned long, int> lastProcesses ; // needed in case of two server levels88 87 map<unsigned long, int>::iterator it ; 89 88 … … 97 96 c++ ; 98 97 } 99 if (hashAll[i] == hashServer) ++nbSrv; 100 //if (hashAll[i+1] != hashAll[i]) // Potential bug here! 101 // lastProcesses[hashAll[i]]=i ; // It seems that lastprocesses is only used for calculating the server size. Can we count server size directly? 98 if (hashAll[i] == hashServer) ++serverSize_; 102 99 } 103 100 … … 107 104 { 108 105 int serverRank = rank_ - leaders[hashServer]; // server proc rank starting 0 109 serverSize_ = nbSrv; //lastProcesses[hashServer] - leaders[hashServer] + 1;110 // serverSize_ = lastProcesses - leaders[hashServer];111 106 nbPools = serverSize_ * CXios::ratioServer2 / 100; 112 107 if ( serverRank < (serverSize_ - nbPools) ) … … 324 319 { 325 320 listenContext(); 321 listenRootContext(); 326 322 if (!finished) listenFinalize() ; 327 323 } … … 467 463 int size ; 468 464 MPI_Comm_size(intraComm,&size) ; 469 MPI_Request* requests= new MPI_Request[size-1] ; 470 MPI_Status* status= new MPI_Status[size-1] ; 471 472 for(int i=1;i<size;i++) 473 { 474 MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ; 475 } 476 MPI_Waitall(size-1,requests,status) ; 477 registerContext(buff,count,it->second.leaderRank) ; 465 // MPI_Request* requests= new MPI_Request[size-1] ; 466 // MPI_Status* status= new MPI_Status[size-1] ; 467 MPI_Request* requests= new MPI_Request[size] ; 468 MPI_Status* status= new MPI_Status[size] ; 469 470 CMessage msg ; 471 msg<<id<<it->second.leaderRank; 472 int messageSize=msg.size() ; 473 void * sendBuff = new char[messageSize] ; 474 CBufferOut sendBuffer(sendBuff,messageSize) ; 475 sendBuffer<<msg ; 476 477 // Include root itself in order not to have a divergence 478 for(int i=0; i<size; i++) 479 { 480 MPI_Isend(sendBuff,count,MPI_CHAR,i,2,intraComm,&requests[i]) ; 481 } 482 483 // for(int i=1;i<size;i++) 484 // { 485 // MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ; 486 // } 487 // MPI_Waitall(size-1,requests,status) ; 488 // registerContext(buff,count,it->second.leaderRank) ; 478 489 479 490 recvContextId.erase(it) ; … … 492 503 static bool recept=false ; 493 504 int rank ; 494 int count ; 505 // int count ; 506 static int count ; 495 507 const int root=0 ; 496 508 boost::hash<string> hashString; 509 size_t hashId = hashString("RegisterContext"); 510 511 // (1) Receive context id from the root 497 512 if (recept==false) 498 513 { … … 508 523 } 509 524 } 525 // (2) If context id is received, save it into a buffer and register an event 510 526 else 511 527 { … … 514 530 { 515 531 MPI_Get_count(&status,MPI_CHAR,&count) ; 516 registerContext(buffer,count) ; 517 delete [] buffer ; 532 eventScheduler->registerEvent(nbContexts,hashId); 533 // registerContext(buffer,count) ; 534 // delete [] buffer ; 518 535 recept=false ; 519 536 } 537 } 538 // (3) If event has been scheduled, call register context 539 if (eventScheduler->queryEvent(nbContexts,hashId)) 540 { 541 registerContext(buffer,count) ; 542 ++nbContexts; 543 delete [] buffer ; 520 544 } 521 545 } … … 525 549 string contextId; 526 550 CBufferIn buffer(buff, count); 527 buffer >> contextId; 551 // buffer >> contextId; 552 buffer >> contextId>>leaderRank; 528 553 CContext* context; 529 554 … … 537 562 contextList[contextId]=context; 538 563 539 // Primary or classical server: initialize its own server (CContextServer) 564 // Primary or classical server: create communication channel with a client 565 // (1) create interComm (with a client) 566 // (2) initialize client and server (contextClient and contextServer) 540 567 MPI_Comm inter; 541 568 if (serverLevel < 2) … … 550 577 551 578 } 552 // Secondary server: initialize its own server (CContextServer) 579 // Secondary server: create communication channel with a primary server 580 // (1) duplicate interComm with a primary server 581 // (2) initialize client and server (contextClient and contextServer) 582 // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create, 583 // because interComm of CContext is defined on the same processes as the interComm of CServer. 584 // So just duplicate it. 553 585 else if (serverLevel == 2) 554 586 { … … 558 590 } 559 591 560 // Primary server: send create context message to secondary servers and initialize its own client (CContextClient) 592 // Primary server: 593 // (1) send create context message to secondary servers 594 // (2) initialize communication channels with secondary servers (create contextClient and contextServer) 561 595 if (serverLevel == 1) 562 596 { … … 582 616 delete [] buff ; 583 617 } 584 ++nbContexts_;585 618 } 586 619 } … … 596 629 if (isFinalized) 597 630 { 598 // it->second->postFinalize();599 631 contextList.erase(it) ; 600 632 break ; … … 605 637 } 606 638 607 //! Get rank of the current process 639 //! Get rank of the current process in the intraComm 608 640 int CServer::getRank() 609 641 { -
XIOS/dev/dev_olga/src/server.hpp
r1077 r1148 65 65 private: 66 66 static int rank_; 67 static int serverLeader_; //!< Leader of the classical or primary server (needed in case of secondary servers)68 static int serverSize_; //!< Number of procs dedicated to servers (primary and seconday (if any) combined)69 static int nbPools; //!< Number of secondary server pools70 static int poolId; //!< id of a secondary server pool starting from 171 static int nbContexts _;67 static int serverLeader_; //!< Leader of the classical or primary server (needed in case of secondary servers) 68 static int serverSize_; //!< Number of procs dedicated to servers (primary and seconday (if any) combined) 69 static int nbPools; //!< Number of secondary server pools 70 static int poolId; //!< id of a secondary server pool starting from 1 71 static int nbContexts; //!< Number of contexts registered by server 72 72 static StdOFStream m_infoStream; 73 73 static StdOFStream m_errorStream;
Note: See TracChangeset
for help on using the changeset viewer.