Ignore:
Timestamp:
08/25/15 16:52:45 (9 years ago)
Author:
rlacroix
Message:

Add support for indexed output.

If the new field attribute "indexed_output" is set to true and a mask is defined (either at grid, domain or axis level), the indexed data will be outputed instead of the full data with missing values.

See http://cfconventions.org/Data/cf-conventions/cf-conventions-1.5/build/cf-conventions.html#compression-by-gathering for more information.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/node/axis.cpp

    r670 r676  
    88#include "context.hpp" 
    99#include "context_client.hpp" 
     10#include "context_server.hpp" 
    1011#include "xios_spl.hpp" 
    1112#include "inverse_axis.hpp" 
     
    1415#include "server_distribution_description.hpp" 
    1516#include "client_server_mapping_distributed.hpp" 
     17#include "distribution_client.hpp" 
    1618 
    1719namespace xios { 
     
    2224      : CObjectTemplate<CAxis>() 
    2325      , CAxisAttributes(), isChecked(false), relFiles(), baseRefObject(), areClientAttributesChecked_(false) 
    24       , isDistributed_(false), hasBounds_(false) 
     26      , isDistributed_(false), hasBounds_(false), isCompressible_(false) 
     27      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0) 
    2528      , transformationMap_(), global_zoom_begin(0), global_zoom_size(0) 
    2629   { 
     
    3033      : CObjectTemplate<CAxis>(id) 
    3134      , CAxisAttributes(), isChecked(false), relFiles(), baseRefObject(), areClientAttributesChecked_(false) 
    32       , isDistributed_(false), hasBounds_(false) 
     35      , isDistributed_(false), hasBounds_(false), isCompressible_(false) 
     36      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0) 
    3337      , transformationMap_(), global_zoom_begin(0), global_zoom_size(0) 
    3438   { 
     
    5054   } 
    5155 
     56   bool CAxis::isWrittenCompressed(const StdString& filename) const 
     57   { 
     58      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end()); 
     59   } 
     60 
    5261   bool CAxis::isDistributed(void) const 
    5362   { 
     
    5564   } 
    5665 
     66   /*! 
     67    * Test whether the data defined on the axis can be outputted in a compressed way. 
     68    *  
     69    * \return true if and only if a mask was defined for this axis 
     70    */ 
     71   bool CAxis::isCompressible(void) const 
     72   { 
     73      return isCompressible_; 
     74   } 
     75 
    5776   void CAxis::addRelFile(const StdString & filename) 
    5877   { 
    5978      this->relFiles.insert(filename); 
     79   } 
     80 
     81   void CAxis::addRelFileCompressed(const StdString& filename) 
     82   { 
     83      this->relFilesCompressed.insert(filename); 
     84   } 
     85 
     86   //---------------------------------------------------------------- 
     87 
     88   const std::vector<int>& CAxis::getIndexesToWrite(void) const 
     89   { 
     90     return indexesToWrite; 
     91   } 
     92 
     93   /*! 
     94     Returns the number of indexes written by each server. 
     95     \return the number of indexes written by each server 
     96   */ 
     97   int CAxis::getNumberWrittenIndexes() const 
     98   { 
     99     return numberWrittenIndexes_; 
     100   } 
     101 
     102   /*! 
     103     Returns the total number of indexes written by the servers. 
     104     \return the total number of indexes written by the servers 
     105   */ 
     106   int CAxis::getTotalNumberWrittenIndexes() const 
     107   { 
     108     return totalNumberWrittenIndexes_; 
     109   } 
     110 
     111   /*! 
     112     Returns the offset of indexes written by each server. 
     113     \return the offset of indexes written by each server 
     114   */ 
     115   int CAxis::getOffsetWrittenIndexes() const 
     116   { 
     117     return offsetWrittenIndexes_; 
    60118   } 
    61119 
     
    171229  } 
    172230 
     231  void CAxis::checkEligibilityForCompressedOutput() 
     232  { 
     233    // We don't check if the mask is valid here, just if a mask has been defined at this point. 
     234    isCompressible_ = !mask.isEmpty(); 
     235  } 
    173236 
    174237  bool CAxis::dispatchEvent(CEventServer& event) 
     
    277340    } 
    278341 
     342    std::set<int> writtenInd; 
     343    if (isCompressible_) 
     344    { 
     345      for (int idx = 0; idx < data_index.numElements(); ++idx) 
     346      { 
     347        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni); 
     348 
     349        if (ind >= 0 && ind < ni && mask(ind)) 
     350        { 
     351          ind += ibegin; 
     352          if (ind >= global_zoom_begin && ind <= zoom_end) 
     353            writtenInd.insert(ind); 
     354        } 
     355      } 
     356    } 
     357 
    279358    std::vector<int> nGlobDomain(1); 
    280359    nGlobDomain[0] = n_glo.getValue(); 
     
    303382                                        iteVec = (globalAxisZoom).end(); 
    304383    indSrv_.clear(); 
     384    indWrittenSrv_.clear(); 
    305385    for (; it != ite; ++it) 
    306386    { 
     
    315395          indSrv_[rank].push_back(globalIndexTmp[i]); 
    316396        } 
     397 
     398        if (writtenInd.count(globalIndexTmp[i])) 
     399        { 
     400          indWrittenSrv_[rank].push_back(globalIndexTmp[i]); 
     401        } 
    317402      } 
    318403    } 
     
    337422    CContext* context = CContext::getCurrent(); 
    338423    CContextClient* client = context->client; 
    339     CEventClient event(getType(),EVENT_ID_NON_DISTRIBUTED_VALUE); 
    340  
    341     int zoom_end = global_zoom_begin+global_zoom_size-1; 
    342     int nb =0; 
     424    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE); 
     425 
     426    int zoom_end = global_zoom_begin + global_zoom_size - 1; 
     427    int nb = 0; 
    343428    for (size_t idx = 0; idx < n; ++idx) 
    344429    { 
    345430      size_t globalIndex = begin + idx; 
    346431      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb; 
     432    } 
     433 
     434    int nbWritten = 0; 
     435    if (isCompressible_) 
     436    { 
     437      for (int idx = 0; idx < data_index.numElements(); ++idx) 
     438      { 
     439        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n); 
     440 
     441        if (ind >= 0 && ind < n && mask(ind)) 
     442        { 
     443          ind += begin; 
     444          if (ind >= global_zoom_begin && ind <= zoom_end) 
     445            ++nbWritten; 
     446        } 
     447      } 
    347448    } 
    348449 
     
    359460    } 
    360461 
     462    CArray<int, 1> writtenInd(nbWritten); 
     463    nbWritten = 0; 
     464    if (isCompressible_) 
     465    { 
     466      for (int idx = 0; idx < data_index.numElements(); ++idx) 
     467      { 
     468        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n); 
     469 
     470        if (ind >= 0 && ind < n && mask(ind)) 
     471        { 
     472          ind += begin; 
     473          if (ind >= global_zoom_begin && ind <= zoom_end) 
     474          { 
     475            writtenInd(nbWritten) = ind; 
     476            ++nbWritten; 
     477          } 
     478        } 
     479      } 
     480    } 
     481 
    361482    if (client->isServerLeader()) 
    362483    { 
     
    366487      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
    367488      { 
    368         // Use const int to ensure CMessage holds a copy of the value instead of just a reference 
    369489        msgs.push_back(CMessage()); 
    370490        CMessage& msg = msgs.back(); 
    371491        msg << this->getId(); 
    372492        msg << val; 
    373         event.push(*itRank,1,msg); 
     493        if (isCompressible_) 
     494          msg << writtenInd; 
     495        event.push(*itRank, 1, msg); 
    374496      } 
    375497      client->sendEvent(event); 
     
    390512    list<CMessage> list_msgsIndex, list_msgsVal; 
    391513    list<CArray<int,1> > list_indi; 
     514    list<CArray<int,1> > list_writtenInd; 
    392515    list<CArray<double,1> > list_val; 
    393516    list<CArray<double,2> > list_bounds; 
     
    433556      list_msgsIndex.back() << this->getId() << list_indi.back(); 
    434557 
     558      if (isCompressible_) 
     559      { 
     560        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank]; 
     561        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size())); 
     562        CArray<int,1>& writtenInd = list_writtenInd.back(); 
     563 
     564        for (n = 0; n < writtenInd.numElements(); ++n) 
     565          writtenInd(n) = writtenIndSrc[n]; 
     566 
     567        list_msgsIndex.back() << writtenInd; 
     568      } 
     569 
    435570      list_msgsVal.push_back(CMessage()); 
    436571      list_msgsVal.back() << this->getId() << list_val.back(); 
     
    451586  void CAxis::recvIndex(CEventServer& event) 
    452587  { 
     588    CAxis* axis; 
     589 
    453590    list<CEventServer::SSubEvent>::iterator it; 
    454591    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 
    455592    { 
    456593      CBufferIn* buffer = it->buffer; 
    457       string domainId; 
    458       *buffer >> domainId; 
    459       get(domainId)->recvIndex(it->rank, *buffer); 
     594      string axisId; 
     595      *buffer >> axisId; 
     596      axis = get(axisId); 
     597      axis->recvIndex(it->rank, *buffer); 
     598    } 
     599 
     600    if (axis->isCompressible_) 
     601    { 
     602      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end()); 
     603 
     604      CContextServer* server = CContext::getCurrent()->server; 
     605      axis->numberWrittenIndexes_ = axis->indexesToWrite.size(); 
     606      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm); 
     607      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm); 
     608      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_; 
    460609    } 
    461610  } 
     
    464613  { 
    465614    buffer >> indiSrv_[rank]; 
     615 
     616    if (isCompressible_) 
     617    { 
     618      CArray<int, 1> writtenIndexes; 
     619      buffer >> writtenIndexes; 
     620      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements()); 
     621      for (int i = 0; i < writtenIndexes.numElements(); ++i) 
     622        indexesToWrite.push_back(writtenIndexes(i)); 
     623    } 
    466624  } 
    467625 
     
    472630    { 
    473631      CBufferIn* buffer = it->buffer; 
    474       string domainId; 
    475       *buffer >> domainId; 
    476       get(domainId)->recvDistributedValue(it->rank, *buffer); 
     632      string axisId; 
     633      *buffer >> axisId; 
     634      get(axisId)->recvDistributedValue(it->rank, *buffer); 
    477635    } 
    478636  } 
     
    503661   void CAxis::recvNonDistributedValue(CEventServer& event) 
    504662  { 
     663    CAxis* axis; 
     664 
    505665    list<CEventServer::SSubEvent>::iterator it; 
    506666    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 
    507667    { 
    508668      CBufferIn* buffer = it->buffer; 
    509       string domainId; 
    510       *buffer >> domainId; 
    511       get(domainId)->recvNonDistributedValue(it->rank, *buffer); 
     669      string axisId; 
     670      *buffer >> axisId; 
     671      axis = get(axisId); 
     672      axis->recvNonDistributedValue(it->rank, *buffer); 
     673    } 
     674 
     675    if (axis->isCompressible_) 
     676    { 
     677      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end()); 
     678 
     679      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size(); 
     680      axis->offsetWrittenIndexes_ = 0; 
    512681    } 
    513682  } 
     
    526695        bound_srv(1,ind) = bounds(1,ind); 
    527696      } 
     697    } 
     698 
     699    if (isCompressible_) 
     700    { 
     701      CArray<int, 1> writtenIndexes; 
     702      buffer >> writtenIndexes; 
     703      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements()); 
     704      for (int i = 0; i < writtenIndexes.numElements(); ++i) 
     705        indexesToWrite.push_back(writtenIndexes(i)); 
    528706    } 
    529707  } 
     
    561739        msg << ni << begin << end; 
    562740        msg << global_zoom_begin << global_zoom_size; 
     741        msg << isCompressible_; 
    563742 
    564743        event.push(*itRank,1,msg); 
     
    581760    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_size_tmp; 
    582761 
    583     buffer>>ni_srv>>begin_srv>>end_srv>>global_zoom_begin_tmp>>global_zoom_size_tmp; 
     762    buffer >> ni_srv >> begin_srv >> end_srv; 
     763    buffer >> global_zoom_begin_tmp >> global_zoom_size_tmp; 
     764    buffer >> isCompressible_; 
    584765    global_zoom_begin = global_zoom_begin_tmp; 
    585766    global_zoom_size  = global_zoom_size_tmp; 
Note: See TracChangeset for help on using the changeset viewer.