Ignore:
Timestamp:
03/22/18 10:43:20 (6 years ago)
Author:
yushan
Message:

branch_openmp merged with XIOS_DEV_CMIP6@1459

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/branch_openmp/src/server.cpp

    r1347 r1460  
    33#include "cxios.hpp" 
    44#include "server.hpp" 
     5#include "client.hpp" 
    56#include "type.hpp" 
    67#include "context.hpp" 
     
    1819{ 
    1920    MPI_Comm CServer::intraComm ; 
    20     list<MPI_Comm> CServer::interComm ; 
     21    std::list<MPI_Comm> CServer::interCommLeft ; 
     22    std::list<MPI_Comm> CServer::interCommRight ; 
    2123    std::list<MPI_Comm> CServer::contextInterComms; 
    22     bool CServer::isRoot ; 
    23     int CServer::rank = INVALID_RANK; 
     24    std::list<MPI_Comm> CServer::contextIntraComms; 
     25    int CServer::serverLevel = 0 ; 
     26    int CServer::nbContexts = 0; 
     27    bool CServer::isRoot = false ; 
     28    int CServer::rank_ = INVALID_RANK; 
    2429    StdOFStream CServer::m_infoStream; 
    2530    StdOFStream CServer::m_errorStream; 
    2631    map<string,CContext*> CServer::contextList ; 
     32    vector<int> CServer::sndServerGlobalRanks; 
    2733    bool CServer::finished=false ; 
    2834    bool CServer::is_MPI_Initialized ; 
    2935    CEventScheduler* CServer::eventScheduler = 0; 
    30     
     36 
     37//--------------------------------------------------------------- 
     38/*! 
     39 * \fn void CServer::initialize(void) 
     40 * Creates intraComm for each possible type of servers (classical, primary or secondary). 
     41 * (For now the assumption is that there is one proc per secondary server pool.) 
     42 * Creates interComm and stores them into the following lists: 
     43 *   classical server -- interCommLeft 
     44 *   primary server -- interCommLeft and interCommRight 
     45 *   secondary server -- interCommLeft for each pool. 
     46 *   IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead. 
     47 */ 
    3148    void CServer::initialize(void) 
    3249    { 
    33      // int initialized ; 
    34      // MPI_Initialized(&initialized) ; 
    35      // if (initialized) is_MPI_Initialized=true ; 
    36      // else is_MPI_Initialized=false ; 
     50      //int initialized ; 
     51      //MPI_Initialized(&initialized) ; 
     52      //if (initialized) is_MPI_Initialized=true ; 
     53      //else is_MPI_Initialized=false ; 
     54      int rank ; 
    3755 
    3856      // Not using OASIS 
     
    4058      { 
    4159 
    42        // if (!is_MPI_Initialized) 
    43        // { 
    44        //   MPI_Init(NULL, NULL); 
    45        // } 
     60        //if (!is_MPI_Initialized) 
     61        //{ 
     62        //  MPI_Init(NULL, NULL); 
     63        //} 
    4664        CTimer::get("XIOS").resume() ; 
    4765 
    4866        boost::hash<string> hashString ; 
    49  
    50         unsigned long hashServer=hashString(CXios::xiosCodeId) ; 
     67        unsigned long hashServer = hashString(CXios::xiosCodeId); 
     68 
    5169        unsigned long* hashAll ; 
    52  
    53 //        int rank ; 
     70        unsigned long* srvLevelAll ; 
     71 
    5472        int size ; 
    5573        int myColor ; 
    5674        int i,c ; 
    57         MPI_Comm newComm ; 
    58  
    59         MPI_Comm_size(CXios::globalComm,&size) ; 
    60         MPI_Comm_rank(CXios::globalComm,&rank); 
     75        MPI_Comm newComm; 
     76 
     77        MPI_Comm_size(CXios::globalComm, &size) ; 
     78        MPI_Comm_rank(CXios::globalComm, &rank_); 
     79 
    6180        hashAll=new unsigned long[size] ; 
    62  
    63         MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ; 
     81        MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ; 
    6482 
    6583        map<unsigned long, int> colors ; 
     
    6785        map<unsigned long, int>::iterator it ; 
    6886 
     87        // (1) Establish client leaders, distribute processes between two server levels 
     88        std::vector<int> srvRanks; 
    6989        for(i=0,c=0;i<size;i++) 
    7090        { 
     
    7595            c++ ; 
    7696          } 
    77         } 
    78  
    79         myColor=colors[hashServer] ; 
    80         MPI_Comm_split(CXios::globalComm,myColor,rank,&intraComm) ; 
    81  
    82         int serverLeader=leaders[hashServer] ; 
    83         int clientLeader; 
    84  
    85          serverLeader=leaders[hashServer] ; 
    86          for(it=leaders.begin();it!=leaders.end();it++) 
    87          { 
    88            if (it->first!=hashServer) 
    89            { 
    90              clientLeader=it->second ; 
    91              int intraCommSize, intraCommRank ; 
    92              MPI_Comm_size(intraComm,&intraCommSize) ; 
    93              MPI_Comm_rank(intraComm,&intraCommRank) ; 
    94              #pragma omp critical (_output) 
    95              { 
    96                info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize 
     97          if (CXios::usingServer2) 
     98            if (hashAll[i] == hashServer) 
     99              srvRanks.push_back(i); 
     100        } 
     101 
     102        if (CXios::usingServer2) 
     103        { 
     104          int reqNbProc = srvRanks.size()*CXios::ratioServer2/100.; 
     105          if (reqNbProc<1 || reqNbProc==srvRanks.size()) 
     106          { 
     107            error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
     108                << "It is impossible to dedicate the requested number of processes = "<<reqNbProc 
     109                <<" to secondary server. XIOS will run in the classical server mode."<<endl; 
     110          } 
     111          else 
     112          { 
     113            int firstSndSrvRank = srvRanks.size()*(100.-CXios::ratioServer2)/100. ; 
     114            int poolLeader = firstSndSrvRank; 
     115//*********** (1) Comment out the line below to set one process per pool 
     116//            sndServerGlobalRanks.push_back(srvRanks[poolLeader]); 
     117            int nbPools = CXios::nbPoolsServer2; 
     118            if ( nbPools > reqNbProc || nbPools < 1) 
     119            { 
     120              error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
     121                  << "It is impossible to allocate the requested number of pools = "<<nbPools 
     122                  <<" on the secondary server. It will be set so that there is one process per pool."<<endl; 
     123              nbPools = reqNbProc; 
     124            } 
     125            int remainder = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) % nbPools; 
     126            int procsPerPool = ((int) (srvRanks.size()*CXios::ratioServer2/100.)) / nbPools; 
     127            for (i=0; i<srvRanks.size(); i++) 
     128            { 
     129              if (i >= firstSndSrvRank) 
     130              { 
     131                if (rank_ == srvRanks[i]) 
     132                { 
     133                  serverLevel=2; 
     134                } 
     135                poolLeader += procsPerPool; 
     136                if (remainder != 0) 
     137                { 
     138                  ++poolLeader; 
     139                  --remainder; 
     140                } 
     141//*********** (2) Comment out the two lines below to set one process per pool 
     142//                if (poolLeader < srvRanks.size()) 
     143//                  sndServerGlobalRanks.push_back(srvRanks[poolLeader]); 
     144//*********** (3) Uncomment the line below to set one process per pool 
     145                sndServerGlobalRanks.push_back(srvRanks[i]); 
     146              } 
     147              else 
     148              { 
     149                if (rank_ == srvRanks[i]) serverLevel=1; 
     150              } 
     151            } 
     152            if (serverLevel==2) 
     153            { 
     154              #pragma omp critical (_output) 
     155              info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; 
     156              for (i=0; i<sndServerGlobalRanks.size(); i++) 
     157              { 
     158                if (rank_>= sndServerGlobalRanks[i]) 
     159                { 
     160                  if ( i == sndServerGlobalRanks.size()-1) 
     161                  { 
     162                    myColor = colors.size() + sndServerGlobalRanks[i]; 
     163                  } 
     164                  else if (rank_< sndServerGlobalRanks[i+1]) 
     165                  { 
     166                    myColor = colors.size() + sndServerGlobalRanks[i]; 
     167                    break; 
     168                  } 
     169                } 
     170              } 
     171            } 
     172          } 
     173        } 
     174 
     175        // (2) Create intraComm 
     176        if (serverLevel != 2) myColor=colors[hashServer]; 
     177        MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ; 
     178 
     179        // (3) Create interComm 
     180        if (serverLevel == 0) 
     181        { 
     182          int clientLeader; 
     183          for(it=leaders.begin();it!=leaders.end();it++) 
     184          { 
     185            if (it->first!=hashServer) 
     186            { 
     187              clientLeader=it->second ; 
     188              int intraCommSize, intraCommRank ; 
     189              MPI_Comm_size(intraComm,&intraCommSize) ; 
     190              MPI_Comm_rank(intraComm,&intraCommRank) ; 
     191              #pragma omp critical (_output) 
     192              { 
     193                info(50)<<"intercommCreate::server (classical mode) "<<rank_<<" intraCommSize : "<<intraCommSize 
    97194                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ; 
    98              } 
    99              MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ; 
    100              interComm.push_back(newComm) ; 
    101            } 
    102          } 
    103  
    104          delete [] hashAll ; 
     195              } 
     196 
     197              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 
     198              interCommLeft.push_back(newComm) ; 
     199            } 
     200          } 
     201        } 
     202        else if (serverLevel == 1) 
     203        { 
     204          int clientLeader, srvSndLeader; 
     205          int srvPrmLeader ; 
     206 
     207          for (it=leaders.begin();it!=leaders.end();it++) 
     208          { 
     209            if (it->first != hashServer) 
     210            { 
     211              clientLeader=it->second ; 
     212              int intraCommSize, intraCommRank ; 
     213              MPI_Comm_size(intraComm, &intraCommSize) ; 
     214              MPI_Comm_rank(intraComm, &intraCommRank) ; 
     215              #pragma omp critical (_output) 
     216              { 
     217                info(50)<<"intercommCreate::server (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize 
     218                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ; 
     219              } 
     220              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ; 
     221              interCommLeft.push_back(newComm) ; 
     222            } 
     223          } 
     224 
     225          for (int i = 0; i < sndServerGlobalRanks.size(); ++i) 
     226          { 
     227            int intraCommSize, intraCommRank ; 
     228            MPI_Comm_size(intraComm, &intraCommSize) ; 
     229            MPI_Comm_rank(intraComm, &intraCommRank) ; 
     230            #pragma omp critical (_output) 
     231            { 
     232              info(50)<<"intercommCreate::client (server level 1) "<<rank_<<" intraCommSize : "<<intraCommSize 
     233                <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< sndServerGlobalRanks[i]<<endl ; 
     234            } 
     235            MPI_Intercomm_create(intraComm, 0, CXios::globalComm, sndServerGlobalRanks[i], 1, &newComm) ; 
     236            interCommRight.push_back(newComm) ; 
     237          } 
     238        } 
     239        else 
     240        { 
     241          int clientLeader; 
     242          clientLeader = leaders[hashString(CXios::xiosCodeId)]; 
     243          int intraCommSize, intraCommRank ; 
     244          MPI_Comm_size(intraComm, &intraCommSize) ; 
     245          MPI_Comm_rank(intraComm, &intraCommRank) ; 
     246          #pragma omp critical (_output) 
     247          { 
     248            info(50)<<"intercommCreate::server (server level 2) "<<rank_<<" intraCommSize : "<<intraCommSize 
     249                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ; 
     250          } 
     251 
     252          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 1, &newComm) ; 
     253          interCommLeft.push_back(newComm) ; 
     254        } 
     255 
     256        delete [] hashAll ; 
     257 
    105258      } 
    106259      // using OASIS 
    107 /*      else 
     260      else 
    108261      { 
    109         int rank ,size; 
    110262        int size; 
     263        int myColor; 
     264        int* srvGlobalRanks; 
    111265        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId); 
    112266 
     
    114268        MPI_Comm localComm; 
    115269        oasis_get_localcomm(localComm); 
    116         MPI_Comm_dup(localComm, &intraComm); 
    117  
    118         MPI_Comm_rank(intraComm,&rank) ; 
    119         MPI_Comm_size(intraComm,&size) ; 
     270        MPI_Comm_rank(localComm,&rank_) ; 
     271 
     272//      (1) Create server intraComm 
     273        if (!CXios::usingServer2) 
     274        { 
     275          MPI_Comm_dup(localComm, &intraComm); 
     276        } 
     277        else 
     278        { 
     279          int globalRank; 
     280          MPI_Comm_size(localComm,&size) ; 
     281          MPI_Comm_rank(CXios::globalComm,&globalRank) ; 
     282          srvGlobalRanks = new int[size] ; 
     283          MPI_Allgather(&globalRank, 1, MPI_INT, srvGlobalRanks, 1, MPI_INT, localComm) ; 
     284 
     285          int reqNbProc = size*CXios::ratioServer2/100.; 
     286          if (reqNbProc < 1 || reqNbProc == size) 
     287          { 
     288            error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
     289                << "It is impossible to dedicate the requested number of processes = "<<reqNbProc 
     290                <<" to secondary server. XIOS will run in the classical server mode."<<endl; 
     291            MPI_Comm_dup(localComm, &intraComm); 
     292          } 
     293          else 
     294          { 
     295            int firstSndSrvRank = size*(100.-CXios::ratioServer2)/100. ; 
     296            int poolLeader = firstSndSrvRank; 
     297//*********** (1) Comment out the line below to set one process per pool 
     298//            sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); 
     299            int nbPools = CXios::nbPoolsServer2; 
     300            if ( nbPools > reqNbProc || nbPools < 1) 
     301            { 
     302              error(0)<<"WARNING: void CServer::initialize(void)"<<endl 
     303                  << "It is impossible to allocate the requested number of pools = "<<nbPools 
     304                  <<" on the secondary server. It will be set so that there is one process per pool."<<endl; 
     305              nbPools = reqNbProc; 
     306            } 
     307            int remainder = ((int) (size*CXios::ratioServer2/100.)) % nbPools; 
     308            int procsPerPool = ((int) (size*CXios::ratioServer2/100.)) / nbPools; 
     309            for (int i=0; i<size; i++) 
     310            { 
     311              if (i >= firstSndSrvRank) 
     312              { 
     313                if (globalRank == srvGlobalRanks[i]) 
     314                { 
     315                  serverLevel=2; 
     316                } 
     317                poolLeader += procsPerPool; 
     318                if (remainder != 0) 
     319                { 
     320                  ++poolLeader; 
     321                  --remainder; 
     322                } 
     323//*********** (2) Comment out the two lines below to set one process per pool 
     324//                if (poolLeader < size) 
     325//                  sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); 
     326//*********** (3) Uncomment the line below to set one process per pool 
     327                sndServerGlobalRanks.push_back(srvGlobalRanks[i]); 
     328              } 
     329              else 
     330              { 
     331                if (globalRank == srvGlobalRanks[i]) serverLevel=1; 
     332              } 
     333            } 
     334            if (serverLevel==2) 
     335            { 
     336              info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <<endl ; 
     337              for (int i=0; i<sndServerGlobalRanks.size(); i++) 
     338              { 
     339                if (globalRank>= sndServerGlobalRanks[i]) 
     340                { 
     341                  if (i == sndServerGlobalRanks.size()-1) 
     342                  { 
     343                    myColor = sndServerGlobalRanks[i]; 
     344                  } 
     345                  else if (globalRank< sndServerGlobalRanks[i+1]) 
     346                  { 
     347                    myColor = sndServerGlobalRanks[i]; 
     348                    break; 
     349                  } 
     350                } 
     351              } 
     352            } 
     353            if (serverLevel != 2) myColor=0; 
     354            MPI_Comm_split(localComm, myColor, rank_, &intraComm) ; 
     355          } 
     356        } 
     357 
    120358        string codesId=CXios::getin<string>("oasis_codes_id") ; 
    121  
    122359        vector<string> splitted ; 
    123360        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ; 
     
    128365        MPI_Comm_rank(CXios::globalComm,&globalRank); 
    129366 
     367//      (2) Create interComms with models 
    130368        for(it=splitted.begin();it!=splitted.end();it++) 
    131369        { 
    132370          oasis_get_intercomm(newComm,*it) ; 
    133           if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; 
    134           MPI_Comm_remote_size(newComm,&size); 
    135           interComm.push_back(newComm) ; 
    136         } 
    137               oasis_enddef() ; 
     371          if ( serverLevel == 0 || serverLevel == 1) 
     372          { 
     373            interCommLeft.push_back(newComm) ; 
     374            if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; 
     375          } 
     376        } 
     377 
     378//      (3) Create interComms between primary and secondary servers 
     379        int intraCommSize, intraCommRank ; 
     380        MPI_Comm_size(intraComm,&intraCommSize) ; 
     381        MPI_Comm_rank(intraComm, &intraCommRank) ; 
     382 
     383        if (serverLevel == 1) 
     384        { 
     385          for (int i = 0; i < sndServerGlobalRanks.size(); ++i) 
     386          { 
     387            int srvSndLeader = sndServerGlobalRanks[i]; 
     388            info(50)<<"intercommCreate::client (server level 1) "<<globalRank<<" intraCommSize : "<<intraCommSize 
     389                <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvSndLeader<<endl ; 
     390            MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvSndLeader, 0, &newComm) ; 
     391            interCommRight.push_back(newComm) ; 
     392          } 
     393        } 
     394        else if (serverLevel == 2) 
     395        { 
     396          info(50)<<"intercommCreate::server (server level 2)"<<globalRank<<" intraCommSize : "<<intraCommSize 
     397                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< srvGlobalRanks[0] <<endl ; 
     398          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, srvGlobalRanks[0], 0, &newComm) ; 
     399          interCommLeft.push_back(newComm) ; 
     400        } 
     401        if (CXios::usingServer2) delete [] srvGlobalRanks ; 
     402        oasis_enddef() ; 
    138403      } 
    139 */ 
    140 //      int rank; 
    141       MPI_Comm_rank(intraComm,&rank) ; 
     404 
     405 
     406      MPI_Comm_rank(intraComm, &rank) ; 
    142407      if (rank==0) isRoot=true; 
    143408      else isRoot=false; 
     
    154419      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++) 
    155420        MPI_Comm_free(&(*it)); 
    156       for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++) 
     421 
     422      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++) 
    157423        MPI_Comm_free(&(*it)); 
     424 
     425//      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++) 
     426//        MPI_Comm_free(&(*it)); 
     427 
     428//        for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++) 
     429//          MPI_Comm_free(&(*it)); 
     430 
     431        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) 
     432          MPI_Comm_free(&(*it)); 
     433 
    158434      MPI_Comm_free(&intraComm); 
    159435 
     
    179455         { 
    180456           listenContext(); 
     457           listenRootContext(); 
    181458           if (!finished) listenFinalize() ; 
    182459         } 
     
    196473     void CServer::listenFinalize(void) 
    197474     { 
    198         list<MPI_Comm>::iterator it; 
     475        list<MPI_Comm>::iterator it, itr; 
    199476        int msg ; 
    200477        int flag ; 
    201478 
    202         for(it=interComm.begin();it!=interComm.end();it++) 
     479        for(it=interCommLeft.begin();it!=interCommLeft.end();it++) 
    203480        { 
    204481           MPI_Status status ; 
     
    210487              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; 
    211488              info(20)<<" CServer : Receive client finalize"<<endl ; 
     489              // Sending server finalize message to secondary servers (if any) 
     490              for(itr=interCommRight.begin();itr!=interCommRight.end();itr++) 
     491              { 
     492                MPI_Send(&msg,1,MPI_INT,0,0,*itr) ; 
     493              } 
    212494              MPI_Comm_free(&(*it)); 
    213               interComm.erase(it) ; 
     495              interCommLeft.erase(it) ; 
    214496              break ; 
    215497            } 
    216498         } 
    217499 
    218          if (interComm.empty()) 
     500         if (interCommLeft.empty()) 
    219501         { 
    220502           int i,size ; 
     
    300582     void CServer::recvContextMessage(void* buff,int count) 
    301583     { 
    302        static map<string,contextMessage> recvContextId ; 
     584       static map<string,contextMessage> recvContextId; 
    303585       map<string,contextMessage>::iterator it ; 
    304  
    305586       CBufferIn buffer(buff,count) ; 
    306587       string id ; 
     
    325606         int size ; 
    326607         MPI_Comm_size(intraComm,&size) ; 
    327          MPI_Request* requests= new MPI_Request[size-1] ; 
    328          MPI_Status* status= new MPI_Status[size-1] ; 
    329  
    330          for(int i=1;i<size;i++) 
    331          { 
    332             MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ; 
    333          } 
    334          MPI_Waitall(size-1,requests,status) ; 
    335          registerContext(buff,count,it->second.leaderRank) ; 
     608//         MPI_Request* requests= new MPI_Request[size-1] ; 
     609//         MPI_Status* status= new MPI_Status[size-1] ; 
     610         MPI_Request* requests= new MPI_Request[size] ; 
     611         MPI_Status* status= new MPI_Status[size] ; 
     612 
     613         CMessage msg ; 
     614         msg<<id<<it->second.leaderRank; 
     615         int messageSize=msg.size() ; 
     616         void * sendBuff = new char[messageSize] ; 
     617         CBufferOut sendBuffer(sendBuff,messageSize) ; 
     618         sendBuffer<<msg ; 
     619 
     620         // Include root itself in order not to have a divergence 
     621         for(int i=0; i<size; i++) 
     622         { 
     623           MPI_Isend(sendBuff,sendBuffer.count(),MPI_CHAR,i,2,intraComm,&requests[i]) ; 
     624         } 
    336625 
    337626         recvContextId.erase(it) ; 
     
    344633     void CServer::listenRootContext(void) 
    345634     { 
    346  
    347635       MPI_Status status ; 
    348636       int flag ; 
    349        static char* buffer ; 
    350        static MPI_Request request ; 
    351        static bool recept=false ; 
     637       static std::vector<void*> buffers; 
     638       static std::vector<MPI_Request> requests ; 
     639       static std::vector<int> counts ; 
     640       static std::vector<bool> isEventRegistered ; 
     641       static std::vector<bool> isEventQueued ; 
     642       MPI_Request request; 
     643 
    352644       int rank ; 
    353        int count ; 
    354645       const int root=0 ; 
    355  
    356        if (recept==false) 
    357        { 
    358          traceOff() ; 
    359          MPI_Iprobe(root,2,intraComm, &flag, &status) ; 
    360          traceOn() ; 
    361          if (flag==true) 
    362          { 
    363            MPI_Get_count(&status,MPI_CHAR,&count) ; 
    364            buffer=new char[count] ; 
    365            MPI_Irecv((void*)buffer,count,MPI_CHAR,root,2,intraComm,&request) ; 
    366            recept=true ; 
    367          } 
    368        } 
    369        else 
    370        { 
    371          MPI_Test(&request,&flag,&status) ; 
    372          if (flag==true) 
    373          { 
    374            MPI_Get_count(&status,MPI_CHAR,&count) ; 
    375            registerContext((void*)buffer,count) ; 
    376            delete [] buffer ; 
    377            recept=false ; 
    378          } 
    379        } 
     646       boost::hash<string> hashString; 
     647       size_t hashId = hashString("RegisterContext"); 
     648 
     649       // (1) Receive context id from the root, save it into a buffer 
     650       traceOff() ; 
     651       MPI_Iprobe(root,2,intraComm, &flag, &status) ; 
     652       traceOn() ; 
     653       if (flag==true) 
     654       { 
     655         counts.push_back(0); 
     656         MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ; 
     657         buffers.push_back(new char[counts.back()]) ; 
     658         requests.push_back(request); 
     659         MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&(requests.back())) ; 
     660         isEventRegistered.push_back(false); 
     661         isEventQueued.push_back(false); 
     662         nbContexts++; 
     663       } 
     664 
     665       for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ ) 
     666       { 
     667         // (2) If context id is received, register an event 
     668         MPI_Test(&requests[ctxNb],&flag,&status) ; 
     669         if (flag==true && !isEventRegistered[ctxNb]) 
     670         { 
     671           eventScheduler->registerEvent(ctxNb,hashId); 
     672           isEventRegistered[ctxNb] = true; 
     673         } 
     674         // (3) If event has been scheduled, call register context 
     675         if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb]) 
     676         { 
     677           registerContext(buffers[ctxNb],counts[ctxNb]) ; 
     678           isEventQueued[ctxNb] = true; 
     679           delete [] buffers[ctxNb] ; 
     680         } 
     681       } 
     682 
    380683     } 
    381684 
     
    384687       string contextId; 
    385688       CBufferIn buffer(buff, count); 
    386        buffer >> contextId; 
     689//       buffer >> contextId; 
     690       buffer >> contextId>>leaderRank; 
     691       CContext* context; 
    387692 
    388693       info(20) << "CServer : Register new Context : " << contextId << endl; 
     
    392697               << "Context '" << contextId << "' has already been registred"); 
    393698 
    394        MPI_Comm contextIntercomm; 
    395        //MPI_Barrier(CXios::globalComm); 
    396         
    397        MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm); 
    398  
     699       context=CContext::create(contextId); 
     700       contextList[contextId]=context; 
     701 
     702       // Primary or classical server: create communication channel with a client 
     703       // (1) create interComm (with a client) 
     704       // (2) initialize client and server (contextClient and contextServer) 
    399705       MPI_Comm inter; 
    400        MPI_Intercomm_merge(contextIntercomm,1,&inter); 
    401        MPI_Barrier(inter); 
    402  
    403        CContext* context=CContext::create(contextId); 
    404        contextList[contextId]=context; 
    405        context->initServer(intraComm,contextIntercomm); 
    406  
    407        contextInterComms.push_back(contextIntercomm); 
    408        MPI_Comm_free(&inter); 
     706       if (serverLevel < 2) 
     707       { 
     708         MPI_Comm contextInterComm; 
     709         MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm); 
     710         MPI_Intercomm_merge(contextInterComm,1,&inter); 
     711         MPI_Barrier(inter); 
     712         MPI_Comm_free(&inter); 
     713         context->initServer(intraComm,contextInterComm); 
     714         contextInterComms.push_back(contextInterComm); 
     715 
     716       } 
     717       // Secondary server: create communication channel with a primary server 
     718       // (1) duplicate interComm with a primary server 
     719       // (2) initialize client and server (contextClient and contextServer) 
     720       // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create, 
     721       //         because interComm of CContext is defined on the same processes as the interComm of CServer. 
     722       //         So just duplicate it. 
     723       else if (serverLevel == 2) 
     724       { 
     725         MPI_Comm_dup(interCommLeft.front(), &inter); 
     726         contextInterComms.push_back(inter); 
     727         context->initServer(intraComm, contextInterComms.back()); 
     728       } 
     729 
     730       // Primary server: 
     731       // (1) send create context message to secondary servers 
     732       // (2) initialize communication channels with secondary servers (create contextClient and contextServer) 
     733       if (serverLevel == 1) 
     734       { 
     735         int i = 0, size; 
     736         MPI_Comm_size(intraComm, &size) ; 
     737         for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i) 
     738         { 
     739           StdString str = contextId +"_server_" + boost::lexical_cast<string>(i); 
     740           CMessage msg; 
     741           int messageSize; 
     742           msg<<str<<size<<rank_ ; 
     743           messageSize = msg.size() ; 
     744           buff = new char[messageSize] ; 
     745           CBufferOut buffer(buff,messageSize) ; 
     746           buffer<<msg ; 
     747           MPI_Send(buff, buffer.count(), MPI_CHAR, sndServerGlobalRanks[i], 1, CXios::globalComm) ; 
     748           MPI_Comm_dup(*it, &inter); 
     749           contextInterComms.push_back(inter); 
     750           MPI_Comm_dup(intraComm, &inter); 
     751           contextIntraComms.push_back(inter); 
     752           context->initClient(contextIntraComms.back(), contextInterComms.back()) ; 
     753           delete [] buff ; 
     754         } 
     755       } 
    409756     } 
    410757 
    411      void CServer::contextEventLoop(void) 
    412      { 
    413        bool finished ; 
     758     void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/) 
     759     { 
     760       bool isFinalized ; 
    414761       map<string,CContext*>::iterator it ; 
     762 
    415763       for(it=contextList.begin();it!=contextList.end();it++) 
    416764       { 
    417          finished=it->second->checkBuffersAndListen(); 
    418          if (finished) 
     765         isFinalized=it->second->isFinalized(); 
     766         if (isFinalized) 
    419767         { 
    420768           contextList.erase(it) ; 
    421769           break ; 
    422770         } 
    423        } 
    424  
     771         else 
     772           it->second->checkBuffersAndListen(enableEventsProcessing); 
     773       } 
    425774     } 
    426775 
    427      //! Get rank of the current process 
     776     //! Get rank of the current process in the intraComm 
    428777     int CServer::getRank() 
    429778     { 
     779       int rank; 
     780       MPI_Comm_rank(intraComm,&rank); 
    430781       return rank; 
     782     } 
     783 
     784     vector<int>& CServer::getSecondaryServerGlobalRanks() 
     785     { 
     786       return sndServerGlobalRanks; 
    431787     } 
    432788 
     
    444800      int numDigit = 0; 
    445801      int size = 0; 
     802      int id; 
    446803      MPI_Comm_size(CXios::globalComm, &size); 
    447804      while (size) 
     
    450807        ++numDigit; 
    451808      } 
    452  
    453       fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext; 
     809      id = rank_; //getRank(); 
     810 
     811      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; 
    454812      fb->open(fileNameClient.str().c_str(), std::ios::out); 
    455813      if (!fb->is_open()) 
Note: See TracChangeset for help on using the changeset viewer.