source: XIOS/trunk/src/node/axis.cpp @ 1321

Last change on this file since 1321 was 1169, checked in by mhnguyen, 7 years ago

Non-continuous zoom on axis
Zoomed points are defined by array index of zoom_axis

+) Update axis with new type of zoom

Test
+) On Curie
+) Work
+) Update test_complete with this new zoom.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 37.5 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false), hasLabel(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false), hasLabel(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
47   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
48   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
49   {
50     m["zoom_axis"] = TRANS_ZOOM_AXIS;
51     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
52     m["inverse_axis"] = TRANS_INVERSE_AXIS;
53     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
54     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
55   }
56
57   ///---------------------------------------------------------------
58
59   const std::set<StdString> & CAxis::getRelFiles(void) const
60   {
61      return (this->relFiles);
62   }
63
64   bool CAxis::IsWritten(const StdString & filename) const
65   {
66      return (this->relFiles.find(filename) != this->relFiles.end());
67   }
68
69   bool CAxis::isWrittenCompressed(const StdString& filename) const
70   {
71      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
72   }
73
74   bool CAxis::isDistributed(void) const
75   {
76      return isDistributed_;
77   }
78
79   /*!
80    * Test whether the data defined on the axis can be outputted in a compressed way.
81    *
82    * \return true if and only if a mask was defined for this axis
83    */
84   bool CAxis::isCompressible(void) const
85   {
86      return isCompressible_;
87   }
88
89   void CAxis::addRelFile(const StdString & filename)
90   {
91      this->relFiles.insert(filename);
92   }
93
94   void CAxis::addRelFileCompressed(const StdString& filename)
95   {
96      this->relFilesCompressed.insert(filename);
97   }
98
99   //----------------------------------------------------------------
100
101   const std::vector<int>& CAxis::getIndexesToWrite(void) const
102   {
103     return indexesToWrite;
104   }
105
106   /*!
107     Returns the number of indexes written by each server.
108     \return the number of indexes written by each server
109   */
110   int CAxis::getNumberWrittenIndexes() const
111   {
112     return numberWrittenIndexes_;
113   }
114
115   /*!
116     Returns the total number of indexes written by the servers.
117     \return the total number of indexes written by the servers
118   */
119   int CAxis::getTotalNumberWrittenIndexes() const
120   {
121     return totalNumberWrittenIndexes_;
122   }
123
124   /*!
125     Returns the offset of indexes written by each server.
126     \return the offset of indexes written by each server
127   */
128   int CAxis::getOffsetWrittenIndexes() const
129   {
130     return offsetWrittenIndexes_;
131   }
132
133   //----------------------------------------------------------------
134
135   /*!
136    * Compute the minimum buffer size required to send the attributes to the server(s).
137    *
138    * \return A map associating the server rank with its minimum buffer size.
139    */
140   std::map<int, StdSize> CAxis::getAttributesBufferSize()
141   {
142     CContextClient* client = CContext::getCurrent()->client;
143
144     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
145
146     bool isNonDistributed = (n == n_glo);
147
148     if (client->isServerLeader())
149     {
150       // size estimation for sendServerAttribut
151       size_t size = 6 * sizeof(size_t);
152       // size estimation for sendNonDistributedValue
153       if (isNonDistributed)
154         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
155       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
156
157       const std::list<int>& ranks = client->getRanksServerLeader();
158       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
159       {
160         if (size > attributesSizes[*itRank])
161           attributesSizes[*itRank] = size;
162       }
163     }
164
165     if (!isNonDistributed)
166     {
167       // size estimation for sendDistributedValue
168       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
169       for (it = indSrv_.begin(); it != ite; ++it)
170       {
171         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
172         if (isCompressible_)
173           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
174
175         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
176         if (hasBounds_)
177           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
178 
179         if (hasLabel)
180           sizeValEvent += CArray<StdString,1>::size(it->second.size());
181
182         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
183         if (size > attributesSizes[it->first])
184           attributesSizes[it->first] = size;
185       }
186     }
187
188     return attributesSizes;
189   }
190
191   //----------------------------------------------------------------
192
193   StdString CAxis::GetName(void)   { return (StdString("axis")); }
194   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
195   ENodeType CAxis::GetType(void)   { return (eAxis); }
196
197   //----------------------------------------------------------------
198
199   CAxis* CAxis::createAxis()
200   {
201     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
202     return axis;
203   }
204
205   void CAxis::fillInValues(const CArray<double,1>& values)
206   {
207     this->value = values;
208   }
209
210   void CAxis::checkAttributes(void)
211   {
212      if (this->n_glo.isEmpty())
213        ERROR("CAxis::checkAttributes(void)",
214              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
215              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
216      StdSize size = this->n_glo.getValue();
217
218      if (!this->index.isEmpty())
219      {
220        if (n.isEmpty()) n = index.numElements();
221
222        // It's not so correct but if begin is not the first value of index
223        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
224        if (begin.isEmpty()) begin = index(0);         
225      }
226      else 
227      {
228        if (!this->begin.isEmpty())
229        {
230          if (begin < 0 || begin > size - 1)
231            ERROR("CAxis::checkAttributes(void)",
232                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
233                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
234        }
235        else this->begin.setValue(0);
236
237        if (!this->n.isEmpty())
238        {
239          if (n < 0 || n > size)
240            ERROR("CAxis::checkAttributes(void)",
241                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
242                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
243        }
244        else this->n.setValue(size);
245
246        {
247          index.resize(n);
248          for (int i = 0; i < n; ++i) index(i) = i+begin;
249        }
250      }
251
252      if (!this->value.isEmpty())
253      {
254        StdSize true_size = value.numElements();
255        if (this->n.getValue() != true_size)
256          ERROR("CAxis::checkAttributes(void)",
257                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
258                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
259        this->hasValue = true;
260      }
261
262      this->checkData();
263      this->checkZoom();
264      this->checkMask();
265      this->checkBounds();
266      this->checkLabel();
267
268      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
269                       (!this->n.isEmpty() && (this->n != this->n_glo));
270
271      // A same stupid condition to make sure that if there is only one client, axis
272      // should be considered to be distributed. This should be a temporary solution     
273      isDistributed_ |= (1 == CContext::getCurrent()->client->clientSize);
274   }
275
276   void CAxis::checkData()
277   {
278      if (data_begin.isEmpty()) data_begin.setValue(0);
279
280      if (data_n.isEmpty())
281      {
282        data_n.setValue(n);
283      }
284      else if (data_n.getValue() < 0)
285      {
286        ERROR("CAxis::checkData(void)",
287              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
288              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
289      }
290
291      if (data_index.isEmpty())
292      {
293        data_index.resize(data_n);
294        for (int i = 0; i < data_n; ++i) data_index(i) = i;
295      }
296   }
297
298   void CAxis::checkZoom(void)
299   {
300     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
301     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
302   }
303
304   void CAxis::checkMask()
305   {
306      if (!mask.isEmpty())
307      {
308         if (mask.extent(0) != n)
309           ERROR("CAxis::checkMask(void)",
310                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
311                 << "The mask does not have the same size as the local domain." << std::endl
312                 << "Local size is " << n.getValue() << "." << std::endl
313                 << "Mask size is " << mask.extent(0) << ".");
314      }
315      else // (mask.isEmpty())
316      { // If no mask was defined, we create a default one without any masked point.
317         mask.resize(n);
318         for (int i = 0; i < n; ++i)
319         {
320           mask(i) = true;
321         }
322      }
323   }
324
325  void CAxis::checkBounds()
326  {
327    if (!bounds.isEmpty())
328    {
329      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
330        ERROR("CAxis::checkAttributes(void)",
331              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
332              << "Axis size is " << n.getValue() << "." << std::endl
333              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
334      hasBounds_ = true;
335    }
336    else hasBounds_ = false;
337  }
338
339  void CAxis::checkLabel()
340  {
341    if (!label.isEmpty())
342    {
343      if (label.extent(0) != n)
344        ERROR("CAxis::checkLabel(void)",
345              << "The label array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension of axis size." << std::endl
346              << "Axis size is " << n.getValue() << "." << std::endl
347              << "label size is "<< label.extent(0)<<  " .");
348      hasLabel = true;
349    }
350    else hasLabel = false;
351  }
352  void CAxis::checkEligibilityForCompressedOutput()
353  {
354    // We don't check if the mask is valid here, just if a mask has been defined at this point.
355    isCompressible_ = !mask.isEmpty();
356  }
357
358  bool CAxis::zoomByIndex()
359  {
360    return (!global_zoom_index.isEmpty() && (0 != global_zoom_index.numElements()));
361  }
362
363  bool CAxis::dispatchEvent(CEventServer& event)
364   {
365      if (SuperClass::dispatchEvent(event)) return true;
366      else
367      {
368        switch(event.type)
369        {
370           case EVENT_ID_SERVER_ATTRIBUT :
371             recvServerAttribut(event);
372             return true;
373             break;
374           case EVENT_ID_INDEX:
375            recvIndex(event);
376            return true;
377            break;
378          case EVENT_ID_DISTRIBUTED_VALUE:
379            recvDistributedValue(event);
380            return true;
381            break;
382          case EVENT_ID_NON_DISTRIBUTED_VALUE:
383            recvNonDistributedValue(event);
384            return true;
385            break;
386           default :
387             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
388                    << "Unknown Event");
389           return false;
390         }
391      }
392   }
393
394   void CAxis::checkAttributesOnClient()
395   {
396     if (this->areClientAttributesChecked_) return;
397
398     this->checkAttributes();
399
400     this->areClientAttributesChecked_ = true;
401   }
402
403   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
404                                                          CServerDistributionDescription::ServerDistributionType distType)
405   {
406     CContext* context=CContext::getCurrent() ;
407
408     if (this->isClientAfterTransformationChecked) return;
409     if (context->hasClient)
410     {
411       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
412     }
413
414     this->isClientAfterTransformationChecked = true;
415   }
416
417   // Send all checked attributes to server
418   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
419                                     CServerDistributionDescription::ServerDistributionType distType)
420   {
421     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
422     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
423     CContext* context = CContext::getCurrent();
424
425     if (this->isChecked) return;
426     if (context->hasClient)
427     {
428       sendServerAttribut(globalDim, orderPositionInGrid, distType);
429       if (hasValue) sendValue();
430     }
431
432     this->isChecked = true;
433   }
434
435  void CAxis::sendValue()
436  {
437     if (n.getValue() == n_glo.getValue())
438       sendNonDistributedValue();
439     else
440       sendDistributedValue();
441  }
442
443  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
444                                     CServerDistributionDescription::ServerDistributionType distType)
445  {
446    CContext* context = CContext::getCurrent();
447    CContextClient* client = context->client;
448    int nbServer = client->serverSize;
449    int range, clientSize = client->clientSize;
450    int rank = client->clientRank;
451
452    size_t ni = this->n.getValue();
453    size_t ibegin = this->begin.getValue();
454    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
455    size_t nZoomCount = 0;
456    size_t nbIndex = index.numElements();
457
458    int end = (0 == n) ? begin : begin + n - 1;
459    int zoom_size = zoomByIndex() ? global_zoom_index.numElements() : global_zoom_n;
460    int minInd = min(index);
461    int maxInd = max(index);
462    for (size_t idx = 0; idx < zoom_size; ++idx)
463    {
464      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
465      if (globalZoomIndex >= minInd && globalZoomIndex <= maxInd) ++nZoomCount;
466    }
467
468/*    for (size_t idx = 0; idx < nbIndex; ++idx)
469    {
470      size_t globalIndex = index(idx);
471      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
472    }*/
473   
474    CArray<size_t,1> globalIndexAxis(nbIndex);
475    for (size_t idx = 0; idx < nbIndex; ++idx)
476    {     
477      globalIndexAxis(idx) = (size_t)index(idx);
478    }
479
480    std::vector<size_t> globalAxisZoom(nZoomCount);
481    nZoomCount = 0;
482    for (size_t idx = 0; idx < zoom_size; ++idx)
483    {
484      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
485      if (globalZoomIndex >= minInd && globalZoomIndex <= maxInd)
486      {
487        globalAxisZoom[nZoomCount] = globalZoomIndex;
488        ++nZoomCount;
489      } 
490    }
491
492    std::set<int> writtenInd;
493    if (isCompressible_)
494    {
495      for (int idx = 0; idx < data_index.numElements(); ++idx)
496      {
497        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
498
499        if (ind >= 0 && ind < ni && mask(ind))
500        {
501          ind += ibegin;
502          if (ind >= global_zoom_begin && ind <= zoom_end)
503            writtenInd.insert(ind);
504        }
505      }
506    }
507
508    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
509    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
510    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
511    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
512    {
513      std::vector<int> nGlobAxis(1);
514      nGlobAxis[0] = n_glo.getValue();
515
516      size_t globalSizeIndex = 1, indexBegin, indexEnd;
517      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
518      indexBegin = 0;
519      if (globalSizeIndex <= clientSize)
520      {
521        indexBegin = rank%globalSizeIndex;
522        indexEnd = indexBegin;
523      }
524      else
525      {
526        for (int i = 0; i < clientSize; ++i)
527        {
528          range = globalSizeIndex / clientSize;
529          if (i < (globalSizeIndex%clientSize)) ++range;
530          if (i == client->clientRank) break;
531          indexBegin += range;
532        }
533        indexEnd = indexBegin + range - 1;
534      }
535
536      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
537      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
538      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
539      clientServerMap->computeServerIndexMapping(globalIndexAxis);
540      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
541      delete clientServerMap;
542    }
543    else
544    {
545      std::vector<size_t> globalIndexServer(n_glo.getValue());
546      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
547      {
548        globalIndexServer[idx] = idx;
549      }
550
551      for (int idx = 0; idx < nbServer; ++idx)
552      {
553        globalIndexAxisOnServer[idx] = globalIndexServer;
554      }
555    }
556
557    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
558                                                         ite = globalIndexAxisOnServer.end();
559    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
560                                        iteVec = (globalAxisZoom).end();
561    indSrv_.clear();
562    indWrittenSrv_.clear();
563    for (; it != ite; ++it)
564    {
565      int rank = it->first;
566      const std::vector<size_t>& globalIndexTmp = it->second;
567      int nb = globalIndexTmp.size();
568
569      for (int i = 0; i < nb; ++i)
570      {
571        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
572        {
573          indSrv_[rank].push_back(globalIndexTmp[i]);
574        }
575
576        if (writtenInd.count(globalIndexTmp[i]))
577        {
578          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
579        }
580      }
581    }
582
583    connectedServerRank_.clear();
584    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
585      connectedServerRank_.push_back(it->first);
586    }
587
588    if (!indSrv_.empty())
589    {
590      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
591                                                     iteIndSrv = indSrv_.end();
592      connectedServerRank_.clear();
593      for (; itIndSrv != iteIndSrv; ++itIndSrv)
594        connectedServerRank_.push_back(itIndSrv->first);
595    }
596    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
597  }
598
599  void CAxis::sendNonDistributedValue()
600  {
601    CContext* context = CContext::getCurrent();
602    CContextClient* client = context->client;
603    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
604
605    int zoom_end = global_zoom_begin + global_zoom_n - 1;
606    int nb = 0;
607/*    for (size_t idx = 0; idx < n; ++idx)
608    {
609      size_t globalIndex = begin + idx;
610      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
611    }*/
612
613    int end = (0 == n) ? begin : begin + n - 1;
614    int zoom_size = zoomByIndex() ? global_zoom_index.numElements() : global_zoom_n;
615    for (size_t idx = 0; idx < zoom_size; ++idx)
616    {
617      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
618      if (globalZoomIndex >= begin && globalZoomIndex <= end) ++nb;
619    }
620
621    int nbWritten = 0;
622    if (isCompressible_)
623    {
624      for (int idx = 0; idx < data_index.numElements(); ++idx)
625      {
626        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
627
628        if (ind >= 0 && ind < n && mask(ind))
629        {
630          ind += begin;
631          if (ind >= global_zoom_begin && ind <= zoom_end)
632            ++nbWritten;
633        }
634      }
635    }
636
637    CArray<double,1> val(nb);
638    nb = 0;
639/*    for (size_t idx = 0; idx < n; ++idx)
640    {
641      size_t globalIndex = begin + idx;
642      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
643      {
644        val(nb) = value(idx);
645        ++nb;
646      }
647    }*/
648
649    for (size_t idx = 0; idx < zoom_size; ++idx)
650    {
651      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
652      if (globalZoomIndex >= begin && globalZoomIndex <= end)
653      {
654        val(nb) = value(globalZoomIndex-begin);
655        ++nb;
656      }
657    }
658
659    CArray<int, 1> writtenInd(nbWritten);
660    nbWritten = 0;
661    if (isCompressible_)
662    {
663      for (int idx = 0; idx < data_index.numElements(); ++idx)
664      {
665        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
666
667        if (ind >= 0 && ind < n && mask(ind))
668        {
669          ind += begin;
670          if (ind >= global_zoom_begin && ind <= zoom_end)
671          {
672            writtenInd(nbWritten) = ind;
673            ++nbWritten;
674          }
675        }
676      }
677    }
678
679    if (client->isServerLeader())
680    {
681      std::list<CMessage> msgs;
682
683      const std::list<int>& ranks = client->getRanksServerLeader();
684      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
685      {
686        msgs.push_back(CMessage());
687        CMessage& msg = msgs.back();
688        msg << this->getId();
689        msg << val;
690        if (isCompressible_)
691          msg << writtenInd;
692        event.push(*itRank, 1, msg);
693      }
694      client->sendEvent(event);
695    }
696    else client->sendEvent(event);
697  }
698
699  void CAxis::sendDistributedValue(void)
700  {
701    int ns, n, i, j, ind, nv, idx;
702    CContext* context = CContext::getCurrent();
703    CContextClient* client=context->client;
704
705    // send value for each connected server
706    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
707    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
708
709    list<CMessage> list_msgsIndex, list_msgsVal;
710    list<CArray<int,1> > list_indi;
711    list<CArray<int,1> > list_writtenInd;
712    list<CArray<double,1> > list_val;
713    list<CArray<double,2> > list_bounds;
714    list<CArray<StdString,1> > list_label;
715
716    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
717    iteMap = indSrv_.end();
718    for (int k = 0; k < connectedServerRank_.size(); ++k)
719    {
720      int nbData = 0;
721      int rank = connectedServerRank_[k];
722      it = indSrv_.find(rank);
723      if (iteMap != it)
724        nbData = it->second.size();
725
726      list_indi.push_back(CArray<int,1>(nbData));
727      list_val.push_back(CArray<double,1>(nbData));
728
729      if (hasBounds_)
730      {
731        list_bounds.push_back(CArray<double,2>(2,nbData));
732      }
733     
734      if (hasLabel)
735      {
736        list_label.push_back(CArray<StdString,1>(nbData));
737      }
738
739      CArray<int,1>& indi = list_indi.back();
740      CArray<double,1>& val = list_val.back();
741
742      for (n = 0; n < nbData; ++n)
743      {
744        idx = static_cast<int>(it->second[n]);
745        ind = idx - begin;
746
747        val(n) = value(ind);
748        indi(n) = idx;
749
750        if (hasBounds_)
751        {
752          CArray<double,2>& boundsVal = list_bounds.back();
753          boundsVal(0, n) = bounds(0,n);
754          boundsVal(1, n) = bounds(1,n);
755        }
756       
757        if (hasLabel)
758        {
759          CArray<StdString,1>& labelVal = list_label.back();
760          labelVal(n) = label(n);
761        }
762      }
763
764      list_msgsIndex.push_back(CMessage());
765      list_msgsIndex.back() << this->getId() << list_indi.back();
766
767      if (isCompressible_)
768      {
769        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
770        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
771        CArray<int,1>& writtenInd = list_writtenInd.back();
772
773        for (n = 0; n < writtenInd.numElements(); ++n)
774          writtenInd(n) = writtenIndSrc[n];
775
776        list_msgsIndex.back() << writtenInd;
777      }
778
779      list_msgsVal.push_back(CMessage());
780      list_msgsVal.back() << this->getId() << list_val.back();
781
782      if (hasBounds_)
783      {
784        list_msgsVal.back() << list_bounds.back();
785      }
786 
787      if (hasLabel)
788      {
789        list_msgsVal.back() << list_label.back();
790      }
791
792      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
793      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
794    }
795
796    client->sendEvent(eventIndex);
797    client->sendEvent(eventVal);
798  }
799
800  void CAxis::recvIndex(CEventServer& event)
801  {
802    CAxis* axis;
803
804    list<CEventServer::SSubEvent>::iterator it;
805    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
806    {
807      CBufferIn* buffer = it->buffer;
808      string axisId;
809      *buffer >> axisId;
810      axis = get(axisId);
811      axis->recvIndex(it->rank, *buffer);
812    }
813
814    if (axis->isCompressible_)
815    {
816      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
817
818      CContextServer* server = CContext::getCurrent()->server;
819      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
820      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
821      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
822      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
823    }
824  }
825
826  void CAxis::recvIndex(int rank, CBufferIn& buffer)
827  {
828    buffer >> indiSrv_[rank];
829
830    if (isCompressible_)
831    {
832      CArray<int, 1> writtenIndexes;
833      buffer >> writtenIndexes;
834      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
835      for (int i = 0; i < writtenIndexes.numElements(); ++i)
836        indexesToWrite.push_back(writtenIndexes(i));
837    }
838  }
839
840  void CAxis::recvDistributedValue(CEventServer& event)
841  {
842    list<CEventServer::SSubEvent>::iterator it;
843    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
844    {
845      CBufferIn* buffer = it->buffer;
846      string axisId;
847      *buffer >> axisId;
848      get(axisId)->recvDistributedValue(it->rank, *buffer);
849    }
850  }
851
852  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
853  {
854    CArray<int,1> &indi = indiSrv_[rank];
855    CArray<double,1> val;
856    CArray<double,2> boundsVal;
857    CArray<StdString,1> labelVal;
858
859    buffer >> val;
860    if (hasBounds_) buffer >> boundsVal;
861    if (hasLabel) buffer >> labelVal;
862
863    int i, j, ind_srv;
864    for (int ind = 0; ind < indi.numElements(); ++ind)
865    {
866      i = indi(ind);
867      ind_srv = i - zoom_begin_srv;
868      value_srv(ind_srv) = val(ind);
869      if (hasBounds_)
870      {
871        bound_srv(0,ind_srv) = boundsVal(0, ind);
872        bound_srv(1,ind_srv) = boundsVal(1, ind);
873      }
874
875      if (hasLabel)
876      {
877        label_srv(ind_srv) = labelVal( ind);
878      }
879
880    }
881  }
882
883   void CAxis::recvNonDistributedValue(CEventServer& event)
884  {
885    CAxis* axis;
886
887    list<CEventServer::SSubEvent>::iterator it;
888    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
889    {
890      CBufferIn* buffer = it->buffer;
891      string axisId;
892      *buffer >> axisId;
893      axis = get(axisId);
894      axis->recvNonDistributedValue(it->rank, *buffer);
895    }
896
897    if (axis->isCompressible_)
898    {
899      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
900
901      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
902      axis->offsetWrittenIndexes_ = 0;
903    }
904  }
905
906  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
907  {
908    CArray<double,1> val;
909    buffer >> val;
910
911    for (int ind = 0; ind < val.numElements(); ++ind)
912    {
913      value_srv(ind) = val(ind);
914      if (hasBounds_)
915      {
916        bound_srv(0,ind) = bounds(0,ind);
917        bound_srv(1,ind) = bounds(1,ind);
918      }
919      if (hasLabel)
920      {
921        label_srv(ind) = label(ind);
922      }
923    }
924
925    if (isCompressible_)
926    {
927      CArray<int, 1> writtenIndexes;
928      buffer >> writtenIndexes;
929      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
930      for (int i = 0; i < writtenIndexes.numElements(); ++i)
931        indexesToWrite.push_back(writtenIndexes(i));
932    }
933  }
934
935  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
936                                 CServerDistributionDescription::ServerDistributionType distType)
937  {
938    CContext* context = CContext::getCurrent();
939    CContextClient* client = context->client;
940    int nbServer = client->serverSize;
941
942    CServerDistributionDescription serverDescription(globalDim, nbServer);
943    serverDescription.computeServerDistribution();
944
945    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
946    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
947
948    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
949    if (client->isServerLeader())
950    {
951      std::list<CMessage> msgs;
952
953      const std::list<int>& ranks = client->getRanksServerLeader();
954      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
955      {
956        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
957        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
958        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
959        const int end   = begin + ni - 1;
960        const bool zoomIndex = zoomByIndex();
961
962        msgs.push_back(CMessage());
963        CMessage& msg = msgs.back();
964        msg << this->getId();
965        msg << ni << begin << end;
966        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
967        msg << isCompressible_;       
968        msg << zoomIndex;
969        if (zoomIndex)
970          msg << global_zoom_index.getValue();
971
972        event.push(*itRank,1,msg);
973      }
974      client->sendEvent(event);
975    }
976    else client->sendEvent(event);
977  }
978
979  void CAxis::recvServerAttribut(CEventServer& event)
980  {
981    CBufferIn* buffer = event.subEvents.begin()->buffer;
982    string axisId;
983    *buffer >> axisId;
984    get(axisId)->recvServerAttribut(*buffer);
985  }
986
987  void CAxis::recvServerAttribut(CBufferIn& buffer)
988  {
989    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
990    bool zoomIndex;   
991    CArray<int,1> zoom_index_recv;
992    std::vector<int> zoom_index_tmp;
993    std::vector<int>::iterator itZoomBeginSrv, itZoomEndSrv, itZoomSrv;
994
995    buffer >> ni_srv >> begin_srv >> end_srv;
996    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
997    buffer >> isCompressible_;
998    buffer >> zoomIndex;
999    if (zoomIndex)
1000    {
1001      buffer >> zoom_index_recv;
1002      global_zoom_index.reference(zoom_index_recv);
1003      zoom_index_tmp.resize(global_zoom_index.numElements());
1004      std::copy(global_zoom_index.begin(), global_zoom_index.end(), zoom_index_tmp.begin());
1005      std::sort(zoom_index_tmp.begin(), zoom_index_tmp.end());
1006      itZoomBeginSrv = std::lower_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), begin_srv);
1007      itZoomEndSrv   = std::upper_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), end_srv);     
1008      int sz = std::distance(itZoomBeginSrv, itZoomEndSrv);
1009      zoom_index_srv.resize(sz);
1010      itZoomSrv = itZoomBeginSrv;
1011      for (int i = 0; i < sz; ++i, ++itZoomSrv)
1012      {
1013        zoom_index_srv(i) = *(itZoomSrv);
1014      }
1015    }
1016
1017    global_zoom_begin = global_zoom_begin_tmp;
1018    global_zoom_n  = global_zoom_n_tmp;
1019    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
1020
1021    zoom_begin_srv = zoomIndex ? std::distance(itZoomBeginSrv, zoom_index_tmp.begin())
1022                                 : global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
1023    zoom_end_srv   = zoomIndex ? std::distance(zoom_index_tmp.begin(), itZoomEndSrv) - 1 
1024                                 : global_zoom_end < end_srv ? global_zoom_end : end_srv ;
1025    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
1026     
1027    global_zoom_begin_srv = zoomIndex ? 0 : global_zoom_begin ;
1028    global_zoom_size_srv  = zoomIndex ? zoom_index_tmp.size() : global_zoom_n;
1029
1030    if (zoom_size_srv<=0)
1031    {
1032      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
1033    }
1034
1035    if (n_glo == n)
1036    {
1037      zoom_begin_srv = zoomIndex ? std::distance(itZoomBeginSrv, zoom_index_tmp.begin())
1038                                   : global_zoom_begin;     
1039      zoom_size_srv  = zoomIndex ? zoom_index_tmp.size()
1040                                   : global_zoom_n;
1041    }
1042    if (hasValue)
1043    {
1044      value_srv.resize(zoom_size_srv);
1045      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
1046      if (hasLabel)  label_srv.resize(zoom_size_srv);
1047    }
1048  }
1049
1050  /*!
1051    Compare two axis objects.
1052    They are equal if only if they have identical attributes as well as their values.
1053    Moreover, they must have the same transformations.
1054  \param [in] axis Compared axis
1055  \return result of the comparison
1056  */
1057  bool CAxis::isEqual(CAxis* obj)
1058  {
1059    vector<StdString> excludedAttr;
1060    excludedAttr.push_back("axis_ref");
1061
1062    bool objEqual = SuperClass::isEqual(obj, excludedAttr);   
1063    if (!objEqual) return objEqual;
1064
1065    TransMapTypes thisTrans = this->getAllTransformations();
1066    TransMapTypes objTrans  = obj->getAllTransformations();
1067
1068    TransMapTypes::const_iterator it, itb, ite;
1069    std::vector<ETranformationType> thisTransType, objTransType;
1070    for (it = thisTrans.begin(); it != thisTrans.end(); ++it)
1071      thisTransType.push_back(it->first);
1072    for (it = objTrans.begin(); it != objTrans.end(); ++it)
1073      objTransType.push_back(it->first);
1074
1075    if (thisTransType.size() != objTransType.size()) return false;
1076    for (int idx = 0; idx < thisTransType.size(); ++idx)
1077      objEqual &= (thisTransType[idx] == objTransType[idx]);
1078
1079    return objEqual;
1080  }
1081
1082  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
1083  {
1084    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
1085    return transformationMap_.back().second;
1086  }
1087
1088  bool CAxis::hasTransformation()
1089  {
1090    return (!transformationMap_.empty());
1091  }
1092
1093  void CAxis::setTransformations(const TransMapTypes& axisTrans)
1094  {
1095    transformationMap_ = axisTrans;
1096  }
1097
1098  CAxis::TransMapTypes CAxis::getAllTransformations(void)
1099  {
1100    return transformationMap_;
1101  }
1102
1103  void CAxis::duplicateTransformation(CAxis* src)
1104  {
1105    if (src->hasTransformation())
1106    {
1107      this->setTransformations(src->getAllTransformations());
1108    }
1109  }
1110
1111  /*!
1112   * Go through the hierarchy to find the axis from which the transformations must be inherited
1113   */
1114  void CAxis::solveInheritanceTransformation()
1115  {
1116    if (hasTransformation() || !hasDirectAxisReference())
1117      return;
1118
1119    CAxis* axis = this;
1120    std::vector<CAxis*> refAxis;
1121    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1122    {
1123      refAxis.push_back(axis);
1124      axis = axis->getDirectAxisReference();
1125    }
1126
1127    if (axis->hasTransformation())
1128      for (size_t i = 0; i < refAxis.size(); ++i)
1129        refAxis[i]->setTransformations(axis->getAllTransformations());
1130  }
1131
1132  void CAxis::parse(xml::CXMLNode & node)
1133  {
1134    SuperClass::parse(node);
1135
1136    if (node.goToChildElement())
1137    {
1138      StdString nodeElementName;
1139      do
1140      {
1141        StdString nodeId("");
1142        if (node.getAttributes().end() != node.getAttributes().find("id"))
1143        { nodeId = node.getAttributes()["id"]; }
1144
1145        nodeElementName = node.getElementName();
1146        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
1147        it = transformationMapList_.find(nodeElementName);
1148        if (ite != it)
1149        {
1150          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1151                                                                                                               nodeId,
1152                                                                                                               &node)));
1153        }
1154        else
1155        {
1156          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1157                << "The transformation " << nodeElementName << " has not been supported yet.");
1158        }
1159      } while (node.goToNextElement()) ;
1160      node.goToParentElement();
1161    }
1162  }
1163
1164  DEFINE_REF_FUNC(Axis,axis)
1165
1166   ///---------------------------------------------------------------
1167
1168} // namespace xios
Note: See TracBrowser for help on using the repository browser.