Ignore:
Timestamp:
04/14/17 16:53:56 (7 years ago)
Author:
mhnguyen
Message:

Updating 2-level server

+) Make some changes in the way data rebuilt on each level of server
+) Make some changes in the order of functions call during close context to make sure that each server receives the global indexes before calculating index to send to next level
+) Modify some functions to make sure data sent to the correct server pool

Test
+) On Curie
+) Only test_client

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_olga/src/node/axis.cpp

    r1054 r1099  
    2727      , isDistributed_(false), hasBounds_(false), isCompressible_(false) 
    2828      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0) 
    29       , transformationMap_(), hasValue(false) 
     29      , transformationMap_(), hasValue(false), doZoomByIndex_(false) 
    3030   { 
    3131   } 
     
    3737      , isDistributed_(false), hasBounds_(false), isCompressible_(false) 
    3838      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0) 
    39       , transformationMap_(), hasValue(false) 
     39      , transformationMap_(), hasValue(false), doZoomByIndex_(false) 
    4040   { 
    4141   } 
     
    334334       zoom_index.setValue(index.getValue()); 
    335335     } 
     336     if (zoom_n.isEmpty()) zoom_n.setValue(n); 
     337     if (zoom_begin.isEmpty()) zoom_begin.setValue(begin); 
    336338   } 
    337339 
     
    384386        switch(event.type) 
    385387        { 
    386            // case EVENT_ID_SERVER_ATTRIBUT : 
    387            //   recvServerAttribut(event); 
    388            //   return true; 
    389            //   break; 
     388           case EVENT_ID_DISTRIBUTION_ATTRIBUTE : 
     389             recvDistributionAttribute(event); 
     390             return true; 
     391             break; 
    390392          //  case EVENT_ID_INDEX: 
    391393          //   recvIndex(event); 
     
    440442 
    441443     if (this->isChecked) return; 
    442      if (context->hasClient) sendAttributes();     
     444     if (context->hasClient) sendAttributes(globalDim, orderPositionInGrid, distType);     
    443445 
    444446     this->isChecked = true; 
    445447   } 
    446448 
    447   void CAxis::sendAttributes() 
     449  void CAxis::sendAttributes(const std::vector<int>& globalDim, int orderPositionInGrid, 
     450                             CServerDistributionDescription::ServerDistributionType distType) 
    448451  { 
    449452     if (index.numElements() == n_glo.getValue()) 
    450453       sendNonDistributedAttributes(); 
    451454     else 
     455     { 
    452456       sendDistributedAttributes(); 
     457       sendDistributionAttribute(globalDim, orderPositionInGrid, distType); 
     458     } 
    453459  } 
    454460 
     
    457463  { 
    458464    CContext* context = CContext::getCurrent(); 
     465 
    459466    // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 
    460467    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1; 
     
    466473      int rank = client->clientRank; 
    467474 
    468       // size_t ni = this->n.getValue(); 
    469       // size_t ibegin = this->begin.getValue(); 
    470       // size_t zoom_end = global_zoom_begin+global_zoom_n-1; 
    471       // size_t nZoomCount = 0; 
     475      size_t ni = this->n.getValue(); 
     476      size_t ibegin = this->begin.getValue(); 
     477      size_t global_zoom_end = global_zoom_begin+global_zoom_n-1; 
     478      size_t nZoomCount = 0; 
    472479      size_t nbIndex = index.numElements(); 
    473       for (size_t idx = 0; idx < nbIndex; ++idx) 
    474       { 
    475         globalLocalIndexMap_[index(idx)] = idx; 
    476         // size_t globalIndex = index(idx); 
    477         // if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount; 
    478       } 
    479  
    480       // CArray<size_t,1> globalIndexAxis(nbIndex); 
    481       // std::vector<size_t> globalAxisZoom(nZoomCount); 
    482       // nZoomCount = 0; 
    483       // for (size_t idx = 0; idx < nbIndex; ++idx) 
    484       // { 
    485       //   size_t globalIndex = index(idx); 
    486       //   globalIndexAxis(idx) = globalIndex; 
    487       //   if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) 
    488       //   { 
    489       //     globalAxisZoom[nZoomCount] = globalIndex; 
    490       //     ++nZoomCount; 
    491       //   } 
    492       // } 
    493  
    494       // std::set<int> writtenInd; 
    495       // if (isCompressible_) 
    496       // { 
    497       //   for (int idx = 0; idx < data_index.numElements(); ++idx) 
    498       //   { 
    499       //     int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni); 
    500  
    501       //     if (ind >= 0 && ind < ni && mask(ind)) 
    502       //     { 
    503       //       ind += ibegin; 
    504       //       if (ind >= global_zoom_begin && ind <= zoom_end) 
    505       //         writtenInd.insert(ind); 
    506       //     } 
    507       //   } 
    508       // } 
     480 
     481      if (doZoomByIndex_)  
     482      { 
     483        nZoomCount = zoom_index.numElements(); 
     484      } 
     485      else 
     486      { 
     487        for (size_t idx = 0; idx < nbIndex; ++idx) 
     488        { 
     489          globalLocalIndexMap_[index(idx)] = idx; 
     490          size_t globalIndex = index(idx); 
     491          if (globalIndex >= global_zoom_begin && globalIndex <= global_zoom_end) ++nZoomCount; 
     492        } 
     493      } 
     494 
     495 
     496      CArray<size_t,1> globalIndexAxis(nbIndex); 
     497      std::vector<size_t> globalAxisZoom(nZoomCount); 
     498      nZoomCount = 0; 
     499      if (doZoomByIndex_)  
     500      { 
     501        int nbIndexZoom = zoom_index.numElements();         
     502        for (int i = 0; i < nbIndexZoom; ++i) 
     503        {    
     504          globalIndexAxis(i) = zoom_index(i); 
     505        } 
     506      } 
     507      else  
     508      { 
     509        for (size_t idx = 0; idx < nbIndex; ++idx) 
     510        { 
     511          size_t globalIndex = index(idx); 
     512          globalIndexAxis(idx) = globalIndex; 
     513          if (globalIndex >= global_zoom_begin && globalIndex <= global_zoom_end) 
     514          { 
     515            globalAxisZoom[nZoomCount] = globalIndex; 
     516            ++nZoomCount; 
     517          } 
     518        } 
     519 
     520        int end       = begin + n -1;         
     521        zoom_begin    = global_zoom_begin > begin ? global_zoom_begin : begin; 
     522        int zoom_end  = global_zoom_end < end ? zoom_end : end; 
     523        zoom_n        = zoom_end-zoom_begin+1; 
     524      } 
     525 
     526      std::set<int> writtenInd; 
     527      if (isCompressible_) 
     528      { 
     529        for (int idx = 0; idx < data_index.numElements(); ++idx) 
     530        { 
     531          int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni); 
     532 
     533          if (ind >= 0 && ind < ni && mask(ind)) 
     534          { 
     535            ind += ibegin; 
     536            if (ind >= global_zoom_begin && ind <= global_zoom_end) 
     537              writtenInd.insert(ind); 
     538          } 
     539        } 
     540      } 
    509541 
    510542      CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer, distType); 
     
    608640  } 
    609641 
     642 
     643  void CAxis::sendDistributionAttribute(const std::vector<int>& globalDim, int orderPositionInGrid, 
     644                                        CServerDistributionDescription::ServerDistributionType distType) 
     645  { 
     646    CContext* context = CContext::getCurrent(); 
     647 
     648    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1; 
     649    for (int i = 0; i < nbSrvPools; ++i) 
     650    { 
     651      CContextClient* contextClientTmp = (context->hasServer) ? context->clientPrimServer[i] 
     652                                                                         : context->client; 
     653      int nbServer = contextClientTmp->serverSize; 
     654 
     655      CServerDistributionDescription serverDescription(globalDim, nbServer); 
     656      serverDescription.computeServerDistribution(); 
     657 
     658      std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin(); 
     659      std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes(); 
     660 
     661      globalDimGrid.resize(globalDim.size()); 
     662      for (int idx = 0; idx < globalDim.size(); ++idx) globalDimGrid(idx) = globalDim[idx]; 
     663 
     664      CEventClient event(getType(),EVENT_ID_DISTRIBUTION_ATTRIBUTE); 
     665      if (contextClientTmp->isServerLeader()) 
     666      { 
     667        std::list<CMessage> msgs; 
     668 
     669        const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 
     670        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
     671        { 
     672          // Use const int to ensure CMessage holds a copy of the value instead of just a reference 
     673          const int begin = serverIndexBegin[*itRank][orderPositionInGrid]; 
     674          const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid]; 
     675          const int end   = begin + ni - 1; 
     676 
     677          msgs.push_back(CMessage()); 
     678          CMessage& msg = msgs.back(); 
     679          msg << this->getId(); 
     680          msg << ni << begin << end; 
     681          msg << global_zoom_begin.getValue() << global_zoom_n.getValue(); 
     682          msg << isCompressible_; 
     683          msg << orderPositionInGrid; 
     684          msg << globalDimGrid; 
     685 
     686          event.push(*itRank,1,msg); 
     687        } 
     688        contextClientTmp->sendEvent(event); 
     689      } 
     690      else contextClientTmp->sendEvent(event); 
     691    } 
     692  } 
    610693 
    611694  // void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid, 
     
    753836  { 
    754837    CContext* context = CContext::getCurrent(); 
     838 
    755839    // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 
    756840    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1; 
     
    859943    int ns, n, i, j, ind, nv, idx; 
    860944    CContext* context = CContext::getCurrent(); 
     945 
    861946    //int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1; 
    862947    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1; 
     
    894979 
    895980        list_indi.push_back(CArray<int,1>(nbData)); 
    896         list_dataInd.push_back(CArray<int,1>(nbData)); 
    897         list_zoomInd.push_back(CArray<int,1>(nbData)); 
     981        list_dataInd.push_back(CArray<int,1>(nbData));         
    898982        list_mask.push_back(CArray<bool,1>(nbData)); 
    899983 
     984        if (doZoomByIndex_) 
     985          list_zoomInd.push_back(CArray<int,1>(nbData)); 
    900986 
    901987        if (hasValue) 
     
    908994 
    909995        CArray<int,1>& indi = list_indi.back(); 
    910         CArray<int,1>& dataIndi = list_dataInd.back(); 
    911         CArray<int,1>& zoomIndi = list_zoomInd.back(); 
     996        CArray<int,1>& dataIndi = list_dataInd.back();         
    912997        CArray<bool,1>& maskIndi = list_mask.back(); 
    913998 
     
    9201005          dataIndi(n) = dataIndex(ind); 
    9211006          maskIndi(n) = mask(ind); 
    922           zoomIndi(n) = zoom_index(ind); 
     1007 
     1008          if (doZoomByIndex_) 
     1009          { 
     1010            CArray<int,1>& zoomIndi = list_zoomInd.back(); 
     1011            zoomIndi(n) = zoom_index(ind); 
     1012          } 
    9231013 
    9241014          if (hasValue) 
     
    9381028        listData.push_back(CMessage()); 
    9391029        listData.back() << this->getId() 
    940                         << list_indi.back() << list_dataInd.back() << list_zoomInd.back() << list_mask.back() 
    941                         << hasValue; 
     1030                        << list_indi.back() << list_dataInd.back() << list_mask.back(); 
     1031 
     1032        listData.back() << doZoomByIndex_;            
     1033        if (doZoomByIndex_) 
     1034          listData.back() << list_zoomInd.back(); 
     1035 
     1036        listData.back() << hasValue; 
    9421037        if (hasValue) 
    9431038          listData.back() << list_val.back(); 
     1039 
    9441040        listData.back() << hasBounds_; 
    9451041        if (hasBounds_) 
     
    9821078      CBufferIn& buffer = *buffers[idx]; 
    9831079      buffer >> vec_indi[idx]; 
    984       buffer >> vec_dataInd[idx]; 
    985       buffer >> vec_zoomInd[idx]; 
     1080      buffer >> vec_dataInd[idx];       
    9861081      buffer >> vec_mask[idx]; 
     1082 
     1083      buffer >> doZoomByIndex_; 
     1084      if (doZoomByIndex_) 
     1085        buffer >> vec_zoomInd[idx]; 
    9871086 
    9881087      buffer >> hasValue; 
    9891088      if (hasValue) 
    9901089        buffer >> vec_val[idx]; 
     1090 
    9911091      buffer >> hasBounds_; 
    9921092      if (hasBounds_) 
     
    10501150    } 
    10511151 
    1052     int nbZoomIndex = 0; 
    1053     for (int idx = 0; idx < nbReceived; ++idx) 
    1054     { 
    1055       nbZoomIndex += vec_zoomInd[idx].numElements(); 
    1056     } 
    1057  
    1058     zoom_index.resize(nbZoomIndex); 
    1059     nbZoomIndex = 0; 
    1060     CArray<int,1>& zoom_Index_Tmp = this->zoom_index; 
    1061     for (int idx = 0; idx < nbReceived; ++idx) 
    1062     {       
    1063       CArray<int,1> tmp = zoom_Index_Tmp(Range(nbZoomIndex, nbZoomIndex + vec_zoomInd[idx].numElements()-1)); 
    1064       tmp = vec_zoomInd[idx]; 
    1065  
    1066       nbZoomIndex += vec_zoomInd[idx].numElements(); 
    1067     } 
    1068  
    1069  
    1070     { 
    1071       CContextServer* server = CContext::getCurrent()->server; 
    1072       count_write_index_ = zoom_index.numElements();       
    1073       MPI_Scan(&count_write_index_, &start_write_index_, 1, MPI_INT, MPI_SUM, server->intraComm); 
    1074       global_write_size_ = start_write_index_; 
    1075       start_write_index_ -= count_write_index_; 
    1076       local_write_size_ = count_write_index_;       
    1077     } 
    1078   } 
    1079  
    1080   // void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid, 
    1081   //                                CServerDistributionDescription::ServerDistributionType distType) 
    1082   // { 
    1083   //   CContext* context = CContext::getCurrent(); 
    1084  
    1085   //   CContextClient* contextClientTmp = (0 != context->clientPrimServer) ? context->clientPrimServer  
    1086   //                                                                       : context->client; 
    1087  
    1088      
    1089   //   int nbServer = contextClientTmp->serverSize; 
    1090  
    1091   //   CServerDistributionDescription serverDescription(globalDim, nbServer); 
    1092   //   serverDescription.computeServerDistribution(); 
    1093  
    1094   //   std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin(); 
    1095   //   std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes(); 
    1096  
    1097   //   globalDimGrid.resize(globalDim.size()); 
    1098   //   for (int idx = 0; idx < globalDim.size(); ++idx) globalDimGrid(idx) = globalDim[idx]; 
    1099  
    1100   //   CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT); 
    1101   //   if (contextClientTmp->isServerLeader()) 
    1102   //   { 
    1103   //     std::list<CMessage> msgs; 
    1104  
    1105   //     const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 
    1106   //     for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
    1107   //     { 
    1108   //       // Use const int to ensure CMessage holds a copy of the value instead of just a reference 
    1109   //       const int begin = serverIndexBegin[*itRank][orderPositionInGrid]; 
    1110   //       const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid]; 
    1111   //       const int end   = begin + ni - 1; 
    1112  
    1113   //       msgs.push_back(CMessage()); 
    1114   //       CMessage& msg = msgs.back(); 
    1115   //       msg << this->getId(); 
    1116   //       msg << ni << begin << end; 
    1117   //       msg << global_zoom_begin.getValue() << global_zoom_n.getValue(); 
    1118   //       msg << isCompressible_; 
    1119   //       msg << orderPositionInGrid; 
    1120   //       msg << globalDimGrid; 
    1121  
    1122   //       event.push(*itRank,1,msg); 
    1123   //     } 
    1124   //     contextClientTmp->sendEvent(event); 
    1125   //   } 
    1126   //   else contextClientTmp->sendEvent(event); 
    1127   // } 
    1128  
    1129   // void CAxis::recvServerAttribut(CEventServer& event) 
    1130   // { 
    1131   //   CBufferIn* buffer = event.subEvents.begin()->buffer; 
    1132   //   string axisId; 
    1133   //   *buffer >> axisId; 
    1134   //   get(axisId)->recvServerAttribut(*buffer); 
    1135  
    1136   //   CContext* context = CContext::getCurrent(); 
    1137   //   if (context->hasClient && context->hasServer) 
    1138   //   { 
    1139   //     std::vector<int> globalDim(get(axisId)->globalDimGrid.numElements()); 
    1140   //     for (int idx = 0; idx < globalDim.size(); ++idx) globalDim[idx] = get(axisId)->globalDimGrid(idx); 
    1141   //     get(axisId)->sendServerAttribut(globalDim, get(axisId)->orderPosInGrid,  
    1142   //                                     CServerDistributionDescription::BAND_DISTRIBUTION); 
    1143   //   } 
    1144   // } 
    1145  
    1146   // void CAxis::recvServerAttribut(CBufferIn& buffer) 
    1147   // { 
    1148   //   int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp; 
    1149  
    1150   //   buffer >> ni_srv >> begin_srv >> end_srv; 
    1151   //   buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp; 
    1152   //   buffer >> isCompressible_; 
    1153   //   buffer >> orderPosInGrid; 
    1154   //   buffer >> globalDimGrid; 
    1155  
    1156   //   global_zoom_begin = global_zoom_begin_tmp; 
    1157   //   global_zoom_n  = global_zoom_n_tmp; 
    1158   //   int global_zoom_end = global_zoom_begin + global_zoom_n - 1; 
    1159  
    1160   //   zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ; 
    1161   //   zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ; 
    1162   //   zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1; 
    1163  
    1164   //   if (zoom_size_srv<=0) 
    1165   //   { 
    1166   //     zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0; 
    1167   //   } 
    1168  
    1169   //   if (n_glo == n) 
    1170   //   { 
    1171   //     zoom_begin_srv = global_zoom_begin; 
    1172   //     zoom_end_srv   = global_zoom_end; //zoom_end; 
    1173   //     zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1; 
    1174   //   } 
    1175   //   if (hasValue) 
    1176   //   { 
    1177   //     value_srv.resize(zoom_size_srv); 
    1178   //     if (hasBounds_)  bound_srv.resize(2,zoom_size_srv); 
    1179   //   } 
    1180   // } 
     1152    if (doZoomByIndex_) 
     1153    { 
     1154      int nbZoomIndex = 0; 
     1155      for (int idx = 0; idx < nbReceived; ++idx) 
     1156      { 
     1157        nbZoomIndex += vec_zoomInd[idx].numElements(); 
     1158      } 
     1159 
     1160      zoom_index.resize(nbZoomIndex); 
     1161      nbZoomIndex = 0;       
     1162      for (int idx = 0; idx < nbReceived; ++idx) 
     1163      {       
     1164        CArray<int,1>& tmp = vec_zoomInd[idx]; 
     1165        for (int i = 0; i < tmp.size(); ++i) 
     1166        { 
     1167          zoom_index(nbZoomIndex) = tmp(i); 
     1168          ++nbZoomIndex; 
     1169        }        
     1170      } 
     1171    } 
     1172 
     1173 
     1174    // { 
     1175    //   CContextServer* server = CContext::getCurrent()->server; 
     1176    //   count_write_index_ = zoom_index.numElements();       
     1177    //   MPI_Scan(&count_write_index_, &start_write_index_, 1, MPI_INT, MPI_SUM, server->intraComm); 
     1178    //   global_write_size_ = start_write_index_; 
     1179    //   start_write_index_ -= count_write_index_; 
     1180    //   local_write_size_ = count_write_index_;       
     1181    // } 
     1182  } 
     1183 
     1184  void CAxis::recvDistributionAttribute(CEventServer& event) 
     1185  { 
     1186    CBufferIn* buffer = event.subEvents.begin()->buffer; 
     1187    string axisId; 
     1188    *buffer >> axisId; 
     1189    get(axisId)->recvDistributionAttribute(*buffer); 
     1190  } 
     1191 
     1192  void CAxis::recvDistributionAttribute(CBufferIn& buffer) 
     1193  { 
     1194    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp; 
     1195 
     1196    buffer >> ni_srv >> begin_srv >> end_srv; 
     1197    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp; 
     1198    buffer >> isCompressible_; 
     1199    buffer >> orderPosInGrid; 
     1200    buffer >> globalDimGrid; 
     1201 
     1202    n.setValue(ni_srv); 
     1203    begin.setValue(begin_srv); 
     1204    global_zoom_begin = global_zoom_begin_tmp; 
     1205    global_zoom_n  = global_zoom_n_tmp; 
     1206    int global_zoom_end = global_zoom_begin + global_zoom_n - 1; 
     1207 
     1208    zoom_begin = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ; 
     1209    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ; 
     1210    zoom_n  = zoom_end_srv - zoom_begin_srv + 1; 
     1211 
     1212    if (zoom_n<=0) 
     1213    { 
     1214      zoom_begin = 0; zoom_end_srv = 0; zoom_n = 0; 
     1215    } 
     1216 
     1217    if (n_glo == n) 
     1218    { 
     1219      zoom_begin = global_zoom_begin; 
     1220      zoom_end_srv   = global_zoom_end; //zoom_end; 
     1221      zoom_n     = zoom_end_srv - zoom_begin + 1; 
     1222    } 
     1223  } 
     1224 
    11811225 
    11821226  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id) 
Note: See TracChangeset for help on using the changeset viewer.