Ignore:
Timestamp:
02/17/17 19:51:36 (7 years ago)
Author:
oabramkina
Message:

dev: intermediate commit.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_olga/src/server.cpp

    r1021 r1054  
    2020    list<MPI_Comm> CServer::interCommLeft ; 
    2121    list<MPI_Comm> CServer::interCommRight ; 
    22     list<MPI_Comm> CServer::interComm ; 
     22//    list<MPI_Comm> CServer::interComm ; 
    2323    std::list<MPI_Comm> CServer::contextInterComms; 
    2424    int CServer::serverLevel = 0 ; 
     25    int CServer::serverLeader = 0; 
     26    int CServer::serverSize = 0; 
    2527    int CServer::nbPools = 0; 
    2628    int CServer::poolId = 0; 
    27     int CServer::serverSize = 0; 
    2829    bool CServer::isRoot = false ; 
    2930    int CServer::rank = INVALID_RANK; 
     
    3839/*! 
    3940 * \fn void CServer::initialize(void) 
    40  * Creates intraComm and interComm for a server pool (primary or secondary). 
     41 * Creates intraComm for each possible type of servers (classical, primary or secondary). 
     42 * In case of secondary servers intraComm is created for each secondary server pool. 
     43 * (For now the assumption is that there is one proc per pool.) 
     44 * Creates the following lists of interComms: 
     45 *   classical server -- interCommLeft 
     46 *   primary server -- interCommLeft and interCommRight 
     47 *   secondary server -- interComm for each pool. 
    4148 */ 
    4249    void CServer::initialize(void) 
     
    6471 
    6572        unsigned long* hashAll ; 
    66 //        unsigned long* hashAllServers ; 
    6773 
    6874//        int rank ; 
     
    129135              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 
    130136               interCommLeft.push_back(newComm) ; 
    131                interComm.push_back(newComm) ; 
    132137            } 
    133138          } 
     
    135140        else if (serverLevel == 1) 
    136141        { 
    137           int clientLeader, srvPrmLeader, srvSndLeader; 
     142          int clientLeader, srvSndLeader; 
     143          int srvPrmLeader ; 
    138144          for (it=leaders.begin();it!=leaders.end();it++) 
    139145          { 
     
    148154              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 
    149155              interCommLeft.push_back(newComm) ; 
    150               interComm.push_back(newComm) ; 
    151156            } 
    152157            else 
    153               srvPrmLeader = it->second; 
     158              serverLeader = it->second; 
    154159          } 
    155160 
    156161          for (int i = 0; i < nbPools; ++i) 
    157162          { 
    158             srvSndLeader = srvPrmLeader + serverSize - nbPools + i; 
    159 //            CClient::initializeClientOnServer(rank, serversComm, srvSndLeader); 
    160             CClient::initializeClientOnServer(rank, intraComm, srvSndLeader); 
    161             interCommRight.push_back(CClient::getInterComm()); 
    162             interComm.push_back(CClient::getInterComm()); 
     163            srvSndLeader = serverLeader + serverSize - nbPools + i; 
     164            int intraCommSize, intraCommRank ; 
     165            MPI_Comm_size(intraComm, &intraCommSize) ; 
     166            MPI_Comm_rank(intraComm, &intraCommRank) ; 
     167            info(50)<<"intercommCreate::client "<<rank<<" intraCommSize : "<<intraCommSize 
     168                <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvSndLeader<<endl ; 
     169            MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ; 
     170            interCommRight.push_back(newComm) ; 
    163171          } 
    164172        } // primary server 
     
    175183          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 
    176184          interCommLeft.push_back(newComm) ; 
    177           interComm.push_back(newComm) ; 
    178185        } // secondary server 
    179186 
     
    210217          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; 
    211218          MPI_Comm_remote_size(newComm,&size); 
    212           interComm.push_back(newComm) ; 
     219//          interComm.push_back(newComm) ; 
     220          interCommLeft.push_back(newComm) ; 
    213221        } 
    214222              oasis_enddef() ; 
    215223      } 
    216224 
    217       MPI_Comm_rank(intraComm, &rank) ; 
    218       if (rank==0) isRoot=true; 
     225      int rankServer; 
     226      MPI_Comm_rank(intraComm, &rankServer) ; 
     227      if (rankServer==0) isRoot=true; 
    219228      else isRoot=false; 
    220229       
     
    235244//        MPI_Comm_free(&(*it)); 
    236245 
    237       for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++) 
    238         MPI_Comm_free(&(*it)); 
    239  
    240       for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) 
    241         MPI_Comm_free(&(*it)); 
     246        for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++) 
     247          MPI_Comm_free(&(*it)); 
     248 
     249        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) 
     250          MPI_Comm_free(&(*it)); 
    242251 
    243252      MPI_Comm_free(&intraComm); 
     
    273282 
    274283         contextEventLoop() ; 
     284//         if (finished && contextList.empty()) stop=true ; 
    275285         if (finished && contextList.empty()) stop=true ; 
    276286         eventScheduler->checkEvent() ; 
     
    294304           if (flag==true) 
    295305           { 
    296              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; 
    297              info(20)<<" CServer : Receive client finalize"<<endl ; 
    298  
    299              // If primary server, send finalize to secondary server pool(s) 
    300              for(itr=interCommRight.begin(); itr!=interCommRight.end(); itr++) 
    301              { 
    302                MPI_Send(&msg,1,MPI_INT,0,0,*itr) ; 
    303 //               MPI_Comm_free(&(*itr)); 
    304 //               interCommRight.erase(itr) ; 
    305              } 
    306  
     306              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; 
     307              info(20)<<" CServer : Receive client finalize"<<endl ; 
     308              // Sending server finalize message to secondary servers (if any) 
     309              for(itr=interCommRight.begin();itr!=interCommRight.end();itr++) 
     310              { 
     311                MPI_Send(&msg,1,MPI_INT,0,0,*itr) ; 
     312//                itr = interCommRight.erase(itr) ; 
     313              } 
    307314              MPI_Comm_free(&(*it)); 
    308 //              interComm.erase(it) ; 
    309315              interCommLeft.erase(it) ; 
    310316              break ; 
     
    312318         } 
    313319 
    314         if (interCommLeft.empty()) 
    315 //        if (interComm.empty()) 
     320         if (interCommLeft.empty()) 
    316321         { 
    317322           int i,size ; 
     
    381386           MPI_Get_count(&status,MPI_CHAR,&count) ; 
    382387           recvContextMessage(buffer,count) ; 
    383            delete [] buffer ; 
     388           delete [] buffer; 
    384389           recept=false ; 
    385390         } 
     
    390395     { 
    391396       static map<string,contextMessage> recvContextId; 
    392  
    393397       map<string,contextMessage>::iterator it ; 
    394  
    395398       CBufferIn buffer(buff,count) ; 
    396399       string id ; 
     
    434437     void CServer::listenRootContext(void) 
    435438     { 
    436  
    437439       MPI_Status status ; 
    438440       int flag ; 
     
    464466           MPI_Get_count(&status,MPI_CHAR,&count) ; 
    465467           registerContext(buffer,count) ; 
    466  
    467468           delete [] buffer ; 
    468469           recept=false ; 
     
    484485               << "Context '" << contextId << "' has already been registred"); 
    485486 
    486        MPI_Comm contextInterComm; 
    487        MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextInterComm); 
    488  
    489        MPI_Comm inter; 
    490        MPI_Intercomm_merge(contextInterComm,1,&inter); 
    491        MPI_Barrier(inter); 
    492  
    493487       context=CContext::create(contextId); 
    494488       contextList[contextId]=context; 
    495        context->initServer(intraComm,contextInterComm); 
    496        contextInterComms.push_back(contextInterComm); 
    497  
     489 
     490       // All type of servers initialize its own server (CContextServer) 
     491       if (serverLevel < 2) 
     492       { 
     493         MPI_Comm contextInterComm; 
     494         MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm); 
     495         MPI_Comm inter; 
     496         MPI_Intercomm_merge(contextInterComm,1,&inter); 
     497         MPI_Barrier(inter); 
     498         MPI_Comm_free(&inter); 
     499         context->initServer(intraComm,contextInterComm); 
     500         contextInterComms.push_back(contextInterComm); 
     501       } 
     502       else if (serverLevel == 2) 
     503       { 
     504         context->initServer(intraComm, interCommLeft.front()); 
     505       } 
     506 
     507       // Primary server: send create context message to secondary servers and initialize its own client (CContextClient) 
    498508       if (serverLevel == 1) 
    499509       { 
    500 //         CClient::registerContext(contextId, intraComm); 
    501          CClient::registerContextByClienOfServer(contextId, intraComm); 
    502        } 
    503  
    504        MPI_Comm_free(&inter); 
    505  
     510         int i = 0, size; 
     511         CMessage msg; 
     512         int messageSize; 
     513         MPI_Comm_size(intraComm, &size) ; 
     514         for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i) 
     515         { 
     516           StdString str = contextId +"_server_" + boost::lexical_cast<string>(i); 
     517           msg<<str<<size<<rank ; 
     518           messageSize = msg.size() ; 
     519           buff = new char[messageSize] ; 
     520           CBufferOut buffer(buff,messageSize) ; 
     521           buffer<<msg ; 
     522           int sndServerGloRanks = serverSize-nbPools+serverLeader +i;  // the assumption is that there is only one proc per secondary server pool 
     523           MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGloRanks, 1, CXios::globalComm) ; 
     524           context->initClient(intraComm, *it) ; 
     525           delete [] buff ; 
     526         } 
     527       } 
    506528     } 
    507529 
     
    509531     { 
    510532       bool finished ; 
     533 
    511534       map<string,CContext*>::iterator it ; 
    512535 
    513536       for(it=contextList.begin();it!=contextList.end();it++) 
    514537       { 
    515          finished=it->second->checkBuffersAndListen(); 
     538         finished=it->second->isFinalized(); 
    516539         if (finished) 
    517540         { 
     
    519542           break ; 
    520543         } 
     544         else 
     545           finished=it->second->checkBuffersAndListen(); 
    521546       } 
    522547     } 
     
    554579      { 
    555580        if (serverLevel == 1) 
    556           id = getRank(); 
     581          id = rank-serverLeader; 
    557582        else 
    558583          id = poolId; 
Note: See TracChangeset for help on using the changeset viewer.