Ignore:
Timestamp:
07/09/15 14:05:43 (9 years ago)
Author:
mhnguyen
Message:

Correcting value written on an distributed axis

+) Seperate writing value on axis in multiple file and one file case

Test
+) On curie
+) test_complet and test_client pass

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/node/axis.cpp

    r631 r633  
    1212#include "zoom_axis.hpp" 
    1313#include "interpolate_axis.hpp" 
     14#include "server_distribution_description.hpp" 
     15#include "client_server_mapping_distributed.hpp" 
    1416 
    1517namespace xios { 
     
    2022      : CObjectTemplate<CAxis>() 
    2123      , CAxisAttributes(), isChecked(false), relFiles(), baseRefObject(), areClientAttributesChecked_(false) 
    22       , isDistributed_(false) 
     24      , isDistributed_(false), hasBounds_(false) 
    2325      , transformationMap_(), global_zoom_begin(0), global_zoom_size(0) 
    2426   { 
     
    2830      : CObjectTemplate<CAxis>(id) 
    2931      , CAxisAttributes(), isChecked(false), relFiles(), baseRefObject(), areClientAttributesChecked_(false) 
    30       , isDistributed_(false) 
     32      , isDistributed_(false), hasBounds_(false) 
    3133      , transformationMap_(), global_zoom_begin(0), global_zoom_size(0) 
    3234   { 
     
    99101      else this->ni.setValue(size); 
    100102 
    101 //      StdSize true_size = value.numElements(); 
    102 //      if (this->ni.getValue() != true_size) 
    103 //         ERROR("CAxis::checkAttributes(void)", 
    104 //               << "The array \'value\' of axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] has a different size that the one defined by the \'size\' attribute"); 
    105  
     103      StdSize true_size = value.numElements(); 
     104      if (this->ni.getValue() != true_size) 
     105         ERROR("CAxis::checkAttributes(void)", 
     106               << "The array \'value\' of axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] has a different size that the one defined by the \'size\' attribute"); 
    106107 
    107108      this->checkData(); 
     109      this->checkZoom(); 
    108110      this->checkMask(); 
    109       this->checkZoom(); 
    110  
    111       if (!bounds.isEmpty()) 
    112       { 
    113         if (bounds.extent(0) != size || bounds.extent(1) != 2) 
    114             ERROR("CAxis::checkAttributes(void)", 
    115                   << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension axis size x 2" << endl 
    116                   << "Axis size is " << size << endl 
    117                   << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1)); 
    118       } 
     111      this->checkBounds(); 
    119112   } 
    120113 
     
    185178   } 
    186179 
     180  void CAxis::checkBounds() 
     181  { 
     182    if (!bounds.isEmpty()) 
     183    { 
     184      if (bounds.extent(0) != ni || bounds.extent(1) != 2) 
     185          ERROR("CAxis::checkAttributes(void)", 
     186                << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension axis size x 2" << endl 
     187                << "Axis size is " << ni << endl 
     188                << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1)); 
     189      hasBounds_ = true; 
     190    } 
     191    else hasBounds_ = false; 
     192  } 
     193 
     194 
    187195  bool CAxis::dispatchEvent(CEventServer& event) 
    188196   { 
     
    196204             return true; 
    197205             break; 
     206           case EVENT_ID_INDEX: 
     207            recvIndex(event); 
     208            return true; 
     209            break; 
     210          case EVENT_ID_DISTRIBUTED_VALUE: 
     211            recvDistributedValue(event); 
     212            return true; 
     213            break; 
     214          case EVENT_ID_NON_DISTRIBUTED_VALUE: 
     215            recvNonDistributedValue(event); 
     216            return true; 
     217            break; 
    198218           default : 
    199219             ERROR("bool CContext::dispatchEvent(CEventServer& event)", 
     
    227247     { 
    228248       sendServerAttribut(globalDim, orderPositionInGrid, distType); 
     249       sendValue(); 
    229250     } 
    230251 
    231252     this->isChecked = true; 
    232253   } 
     254 
     255  void CAxis::sendValue() 
     256  { 
     257     if (ni.getValue() == size.getValue()) 
     258     { 
     259       sendNonDistributedValue(); 
     260     } 
     261     else 
     262     { 
     263       computeConnectedServer(); 
     264       sendDistributedValue(); 
     265     } 
     266  } 
     267 
     268  void CAxis::computeConnectedServer() 
     269  { 
     270    CContext* context = CContext::getCurrent(); 
     271    CContextClient* client = context->client; 
     272    int nbServer = client->serverSize; 
     273    int range, clientSize = client->clientSize; 
     274 
     275    CArray<size_t,1> globalIndexAxis(ni); 
     276    size_t ibegin = this->ibegin.getValue(); 
     277    int zoom_end = global_zoom_begin+global_zoom_size-1; 
     278    std::vector<size_t> globalAxisZoom; 
     279    for (size_t idx = 0; idx < ni; ++idx) 
     280    { 
     281      size_t globalIndex = ibegin + idx; 
     282      globalIndexAxis(idx) = globalIndex; 
     283      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) globalAxisZoom.push_back(globalIndex); 
     284    } 
     285 
     286    std::vector<int> nGlobDomain(1); 
     287    nGlobDomain[0] = size.getValue(); 
     288 
     289    size_t globalSizeIndex = 1, indexBegin, indexEnd; 
     290    for (int i = 0; i < nGlobDomain.size(); ++i) globalSizeIndex *= nGlobDomain[i]; 
     291    indexBegin = 0; 
     292    for (int i = 0; i < clientSize; ++i) 
     293    { 
     294      range = globalSizeIndex / clientSize; 
     295      if (i < (globalSizeIndex%clientSize)) ++range; 
     296      if (i == client->clientRank) break; 
     297      indexBegin += range; 
     298    } 
     299    indexEnd = indexBegin + range - 1; 
     300 
     301    CServerDistributionDescription serverDescription(nGlobDomain); 
     302    serverDescription.computeServerGlobalIndexInRange(nbServer, std::make_pair<size_t,size_t>(indexBegin, indexEnd)); 
     303    CClientServerMappingDistributed clientServerMap(serverDescription.getGlobalIndexRange(), client->intraComm); 
     304    clientServerMap.computeServerIndexMapping(globalIndexAxis); 
     305    const std::map<int, std::vector<size_t> >& globalIndexAxisOnServer = clientServerMap.getGlobalIndexOnServer(); 
     306 
     307    std::map<int, std::vector<size_t> >::const_iterator it = globalIndexAxisOnServer.begin(), 
     308                                                       ite = globalIndexAxisOnServer.end(); 
     309    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(), 
     310                                        iteVec = (globalAxisZoom).end(); 
     311    indSrv_.clear(); 
     312    for (; it != ite; ++it) 
     313    { 
     314      int rank = it->first; 
     315      const std::vector<size_t>& globalIndexTmp = it->second; 
     316      int nb = globalIndexTmp.size(); 
     317 
     318      for (int i = 0; i < nb; ++i) 
     319      { 
     320        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i])) 
     321        { 
     322          indSrv_[rank].push_back(globalIndexTmp[i]); 
     323        } 
     324      } 
     325    } 
     326 
     327    connectedServerRank_.clear(); 
     328    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) { 
     329      connectedServerRank_.push_back(it->first); 
     330    } 
     331 
     332    if (!indSrv_.empty()) 
     333    { 
     334      connectedServerRank_.clear(); 
     335      for (it = indSrv_.begin(); it != indSrv_.end(); ++it) 
     336        connectedServerRank_.push_back(it->first); 
     337    } 
     338    nbConnectedClients_ = clientServerMap.computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_); 
     339 
     340  } 
     341 
     342  void CAxis::sendNonDistributedValue() 
     343  { 
     344    CContext* context = CContext::getCurrent(); 
     345    CContextClient* client = context->client; 
     346    CEventClient event(getType(),EVENT_ID_NON_DISTRIBUTED_VALUE); 
     347 
     348    int zoom_end = global_zoom_begin+global_zoom_size-1; 
     349    int nb =0; 
     350    for (size_t idx = 0; idx < ni; ++idx) 
     351    { 
     352      size_t globalIndex = ibegin + idx; 
     353      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb; 
     354    } 
     355 
     356    CArray<double,1> val(nb); 
     357    nb = 0; 
     358    for (size_t idx = 0; idx < ni; ++idx) 
     359    { 
     360      size_t globalIndex = ibegin + idx; 
     361      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) 
     362      { 
     363        val(nb) = value(idx); 
     364        ++nb; 
     365      } 
     366    } 
     367 
     368    if (client->isServerLeader()) 
     369    { 
     370      std::list<CMessage> msgs; 
     371 
     372      const std::list<int>& ranks = client->getRanksServerLeader(); 
     373      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
     374      { 
     375        // Use const int to ensure CMessage holds a copy of the value instead of just a reference 
     376        msgs.push_back(CMessage()); 
     377        CMessage& msg = msgs.back(); 
     378        msg << this->getId(); 
     379        msg << val; 
     380        event.push(*itRank,1,msg); 
     381      } 
     382      client->sendEvent(event); 
     383    } 
     384    else client->sendEvent(event); 
     385  } 
     386 
     387  void CAxis::sendDistributedValue(void) 
     388  { 
     389    int ns, n, i, j, ind, nv, idx; 
     390    CContext* context = CContext::getCurrent(); 
     391    CContextClient* client=context->client; 
     392 
     393    // send value for each connected server 
     394    CEventClient eventIndex(getType(), EVENT_ID_INDEX); 
     395    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE); 
     396 
     397    list<CMessage> list_msgsIndex, list_msgsVal; 
     398    list<CArray<int,1> > list_indi; 
     399    list<CArray<double,1> > list_val; 
     400    list<CArray<double,2> > list_bounds; 
     401 
     402    std::map<int, std::vector<size_t> >::const_iterator it, iteMap; 
     403    iteMap = indSrv_.end(); 
     404    for (int k = 0; k < connectedServerRank_.size(); ++k) 
     405    { 
     406      int nbData = 0; 
     407      int rank = connectedServerRank_[k]; 
     408      it = indSrv_.find(rank); 
     409      if (iteMap != it) 
     410        nbData = it->second.size(); 
     411 
     412      list_indi.push_back(CArray<int,1>(nbData)); 
     413      list_val.push_back(CArray<double,1>(nbData)); 
     414 
     415      if (hasBounds_) 
     416      { 
     417        list_bounds.push_back(CArray<double,2>(2,nbData)); 
     418      } 
     419 
     420      CArray<int,1>& indi = list_indi.back(); 
     421      CArray<double,1>& val = list_val.back(); 
     422 
     423      for (n = 0; n < nbData; ++n) 
     424      { 
     425        idx = static_cast<int>(it->second[n]); 
     426        ind = idx - ibegin; 
     427 
     428        val(n) = value(ind); 
     429        indi(n) = idx; 
     430 
     431        if (hasBounds_) 
     432        { 
     433          CArray<double,2>& boundsVal = list_bounds.back(); 
     434          boundsVal(0, n) = bounds(0,n); 
     435          boundsVal(1, n) = bounds(1,n); 
     436        } 
     437      } 
     438 
     439      list_msgsIndex.push_back(CMessage()); 
     440      list_msgsIndex.back() << this->getId() << list_indi.back(); 
     441 
     442      list_msgsVal.push_back(CMessage()); 
     443      list_msgsVal.back() << this->getId() << list_val.back(); 
     444 
     445      if (hasBounds_) 
     446      { 
     447        list_msgsVal.back() << list_bounds.back(); 
     448      } 
     449 
     450      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back()); 
     451      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back()); 
     452    } 
     453 
     454    client->sendEvent(eventIndex); 
     455    client->sendEvent(eventVal); 
     456  } 
     457 
     458  void CAxis::recvIndex(CEventServer& event) 
     459  { 
     460    list<CEventServer::SSubEvent>::iterator it; 
     461    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 
     462    { 
     463      CBufferIn* buffer = it->buffer; 
     464      string domainId; 
     465      *buffer >> domainId; 
     466      get(domainId)->recvIndex(it->rank, *buffer); 
     467    } 
     468  } 
     469 
     470  void CAxis::recvIndex(int rank, CBufferIn& buffer) 
     471  { 
     472    buffer >> indiSrv_[rank]; 
     473  } 
     474 
     475  void CAxis::recvDistributedValue(CEventServer& event) 
     476  { 
     477    list<CEventServer::SSubEvent>::iterator it; 
     478    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 
     479    { 
     480      CBufferIn* buffer = it->buffer; 
     481      string domainId; 
     482      *buffer >> domainId; 
     483      get(domainId)->recvDistributedValue(it->rank, *buffer); 
     484    } 
     485  } 
     486 
     487  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer) 
     488  { 
     489    CArray<int,1> &indi = indiSrv_[rank]; 
     490    CArray<double,1> val; 
     491    CArray<double,2> boundsVal; 
     492 
     493    buffer >> val; 
     494    if (hasBounds_) buffer >> boundsVal; 
     495 
     496    int i, j, ind_srv; 
     497    for (int ind = 0; ind < indi.numElements(); ++ind) 
     498    { 
     499      i = indi(ind); 
     500      ind_srv = i - zoom_begin_srv; 
     501      value_srv(ind_srv) = val(ind); 
     502      if (hasBounds_) 
     503      { 
     504        bound_srv(0,ind_srv) = boundsVal(0, ind); 
     505        bound_srv(1,ind_srv) = boundsVal(1, ind); 
     506      } 
     507    } 
     508  } 
     509 
     510   void CAxis::recvNonDistributedValue(CEventServer& event) 
     511  { 
     512    list<CEventServer::SSubEvent>::iterator it; 
     513    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 
     514    { 
     515      CBufferIn* buffer = it->buffer; 
     516      string domainId; 
     517      *buffer >> domainId; 
     518      get(domainId)->recvNonDistributedValue(it->rank, *buffer); 
     519    } 
     520  } 
     521 
     522  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer) 
     523  { 
     524    CArray<double,1> val; 
     525    buffer >> val; 
     526 
     527    for (int ind = 0; ind < val.numElements(); ++ind) 
     528    { 
     529      value_srv(ind) = val(ind); 
     530      if (hasBounds_) 
     531      { 
     532        bound_srv(0,ind) = bounds(0,ind); 
     533        bound_srv(1,ind) = bounds(1,ind); 
     534      } 
     535    } 
     536  } 
    233537 
    234538  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid, 
     
    263567        msg << this->getId(); 
    264568        msg << ni << begin << end; 
    265         msg<<global_zoom_begin<<global_zoom_size; 
     569        msg << global_zoom_begin << global_zoom_size; 
    266570 
    267571        event.push(*itRank,1,msg); 
     
    304608      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1; 
    305609    } 
     610    value_srv.resize(zoom_size_srv); 
     611    bound_srv.resize(2,zoom_size_srv); 
    306612  } 
    307613 
Note: See TracChangeset for help on using the changeset viewer.