source: XIOS/trunk/src/node/axis.cpp @ 1106

Last change on this file since 1106 was 1106, checked in by mhnguyen, 7 years ago

Fixing bug of referencing to an object.

+) From now on, two objects of a same grid element (domain, axis, scalar) are equal if
they have the same non-empty attributes and the same transformations.
(This is very common case with inheritance by *_ref).

Test
+) On Curie
+) Ok with toy_cmip6

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 34.6 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false), hasLabel(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false), hasLabel(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
47   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
48   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
49   {
50     m["zoom_axis"] = TRANS_ZOOM_AXIS;
51     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
52     m["inverse_axis"] = TRANS_INVERSE_AXIS;
53     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
54     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
55   }
56
57   ///---------------------------------------------------------------
58
59   const std::set<StdString> & CAxis::getRelFiles(void) const
60   {
61      return (this->relFiles);
62   }
63
64   bool CAxis::IsWritten(const StdString & filename) const
65   {
66      return (this->relFiles.find(filename) != this->relFiles.end());
67   }
68
69   bool CAxis::isWrittenCompressed(const StdString& filename) const
70   {
71      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
72   }
73
74   bool CAxis::isDistributed(void) const
75   {
76      return isDistributed_;
77   }
78
79   /*!
80    * Test whether the data defined on the axis can be outputted in a compressed way.
81    *
82    * \return true if and only if a mask was defined for this axis
83    */
84   bool CAxis::isCompressible(void) const
85   {
86      return isCompressible_;
87   }
88
89   void CAxis::addRelFile(const StdString & filename)
90   {
91      this->relFiles.insert(filename);
92   }
93
94   void CAxis::addRelFileCompressed(const StdString& filename)
95   {
96      this->relFilesCompressed.insert(filename);
97   }
98
99   //----------------------------------------------------------------
100
101   const std::vector<int>& CAxis::getIndexesToWrite(void) const
102   {
103     return indexesToWrite;
104   }
105
106   /*!
107     Returns the number of indexes written by each server.
108     \return the number of indexes written by each server
109   */
110   int CAxis::getNumberWrittenIndexes() const
111   {
112     return numberWrittenIndexes_;
113   }
114
115   /*!
116     Returns the total number of indexes written by the servers.
117     \return the total number of indexes written by the servers
118   */
119   int CAxis::getTotalNumberWrittenIndexes() const
120   {
121     return totalNumberWrittenIndexes_;
122   }
123
124   /*!
125     Returns the offset of indexes written by each server.
126     \return the offset of indexes written by each server
127   */
128   int CAxis::getOffsetWrittenIndexes() const
129   {
130     return offsetWrittenIndexes_;
131   }
132
133   //----------------------------------------------------------------
134
135   /*!
136    * Compute the minimum buffer size required to send the attributes to the server(s).
137    *
138    * \return A map associating the server rank with its minimum buffer size.
139    */
140   std::map<int, StdSize> CAxis::getAttributesBufferSize()
141   {
142     CContextClient* client = CContext::getCurrent()->client;
143
144     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
145
146     bool isNonDistributed = (n == n_glo);
147
148     if (client->isServerLeader())
149     {
150       // size estimation for sendServerAttribut
151       size_t size = 6 * sizeof(size_t);
152       // size estimation for sendNonDistributedValue
153       if (isNonDistributed)
154         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
155       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
156
157       const std::list<int>& ranks = client->getRanksServerLeader();
158       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
159       {
160         if (size > attributesSizes[*itRank])
161           attributesSizes[*itRank] = size;
162       }
163     }
164
165     if (!isNonDistributed)
166     {
167       // size estimation for sendDistributedValue
168       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
169       for (it = indSrv_.begin(); it != ite; ++it)
170       {
171         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
172         if (isCompressible_)
173           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
174
175         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
176         if (hasBounds_)
177           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
178 
179         if (hasLabel)
180           sizeValEvent += CArray<StdString,1>::size(it->second.size());
181
182         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
183         if (size > attributesSizes[it->first])
184           attributesSizes[it->first] = size;
185       }
186     }
187
188     return attributesSizes;
189   }
190
191   //----------------------------------------------------------------
192
193   StdString CAxis::GetName(void)   { return (StdString("axis")); }
194   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
195   ENodeType CAxis::GetType(void)   { return (eAxis); }
196
197   //----------------------------------------------------------------
198
199   CAxis* CAxis::createAxis()
200   {
201     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
202     return axis;
203   }
204
205   void CAxis::fillInValues(const CArray<double,1>& values)
206   {
207     this->value = values;
208   }
209
210   void CAxis::checkAttributes(void)
211   {
212      if (this->n_glo.isEmpty())
213        ERROR("CAxis::checkAttributes(void)",
214              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
215              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
216      StdSize size = this->n_glo.getValue();
217
218      if (!this->index.isEmpty())
219      {
220        if (n.isEmpty()) n = index.numElements();
221
222        // It's not so correct but if begin is not the first value of index
223        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
224        if (begin.isEmpty()) begin = index(0);         
225      }
226      else 
227      {
228        if (!this->begin.isEmpty())
229        {
230          if (begin < 0 || begin > size - 1)
231            ERROR("CAxis::checkAttributes(void)",
232                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
233                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
234        }
235        else this->begin.setValue(0);
236
237        if (!this->n.isEmpty())
238        {
239          if (n < 0 || n > size)
240            ERROR("CAxis::checkAttributes(void)",
241                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
242                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
243        }
244        else this->n.setValue(size);
245
246        {
247          index.resize(n);
248          for (int i = 0; i < n; ++i) index(i) = i+begin;
249        }
250      }
251
252      if (!this->value.isEmpty())
253      {
254        StdSize true_size = value.numElements();
255        if (this->n.getValue() != true_size)
256          ERROR("CAxis::checkAttributes(void)",
257                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
258                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
259        this->hasValue = true;
260      }
261
262      this->checkData();
263      this->checkZoom();
264      this->checkMask();
265      this->checkBounds();
266      this->checkLabel();
267
268      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
269                       (!this->n.isEmpty() && (this->n != this->n_glo));
270
271      // A same stupid condition to make sure that if there is only one client, axis
272      // should be considered to be distributed. This should be a temporary solution     
273      isDistributed_ |= (1 == CContext::getCurrent()->client->clientSize);
274   }
275
276   void CAxis::checkData()
277   {
278      if (data_begin.isEmpty()) data_begin.setValue(0);
279
280      if (data_n.isEmpty())
281      {
282        data_n.setValue(n);
283      }
284      else if (data_n.getValue() < 0)
285      {
286        ERROR("CAxis::checkData(void)",
287              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
288              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
289      }
290
291      if (data_index.isEmpty())
292      {
293        data_index.resize(data_n);
294        for (int i = 0; i < data_n; ++i) data_index(i) = i;
295      }
296   }
297
298   void CAxis::checkZoom(void)
299   {
300     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
301     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
302   }
303
304   void CAxis::checkMask()
305   {
306      if (!mask.isEmpty())
307      {
308         if (mask.extent(0) != n)
309           ERROR("CAxis::checkMask(void)",
310                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
311                 << "The mask does not have the same size as the local domain." << std::endl
312                 << "Local size is " << n.getValue() << "." << std::endl
313                 << "Mask size is " << mask.extent(0) << ".");
314      }
315      else // (mask.isEmpty())
316      { // If no mask was defined, we create a default one without any masked point.
317         mask.resize(n);
318         for (int i = 0; i < n; ++i)
319         {
320           mask(i) = true;
321         }
322      }
323   }
324
325  void CAxis::checkBounds()
326  {
327    if (!bounds.isEmpty())
328    {
329      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
330        ERROR("CAxis::checkAttributes(void)",
331              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
332              << "Axis size is " << n.getValue() << "." << std::endl
333              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
334      hasBounds_ = true;
335    }
336    else hasBounds_ = false;
337  }
338
339  void CAxis::checkLabel()
340  {
341    if (!label.isEmpty())
342    {
343      if (label.extent(0) != n)
344        ERROR("CAxis::checkLabel(void)",
345              << "The label array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension of axis size." << std::endl
346              << "Axis size is " << n.getValue() << "." << std::endl
347              << "label size is "<< label.extent(0)<<  " .");
348      hasLabel = true;
349    }
350    else hasLabel = false;
351  }
352  void CAxis::checkEligibilityForCompressedOutput()
353  {
354    // We don't check if the mask is valid here, just if a mask has been defined at this point.
355    isCompressible_ = !mask.isEmpty();
356  }
357
358  bool CAxis::dispatchEvent(CEventServer& event)
359   {
360      if (SuperClass::dispatchEvent(event)) return true;
361      else
362      {
363        switch(event.type)
364        {
365           case EVENT_ID_SERVER_ATTRIBUT :
366             recvServerAttribut(event);
367             return true;
368             break;
369           case EVENT_ID_INDEX:
370            recvIndex(event);
371            return true;
372            break;
373          case EVENT_ID_DISTRIBUTED_VALUE:
374            recvDistributedValue(event);
375            return true;
376            break;
377          case EVENT_ID_NON_DISTRIBUTED_VALUE:
378            recvNonDistributedValue(event);
379            return true;
380            break;
381           default :
382             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
383                    << "Unknown Event");
384           return false;
385         }
386      }
387   }
388
389   void CAxis::checkAttributesOnClient()
390   {
391     if (this->areClientAttributesChecked_) return;
392
393     this->checkAttributes();
394
395     this->areClientAttributesChecked_ = true;
396   }
397
398   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
399                                                          CServerDistributionDescription::ServerDistributionType distType)
400   {
401     CContext* context=CContext::getCurrent() ;
402
403     if (this->isClientAfterTransformationChecked) return;
404     if (context->hasClient)
405     {
406       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
407     }
408
409     this->isClientAfterTransformationChecked = true;
410   }
411
412   // Send all checked attributes to server
413   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
414                                     CServerDistributionDescription::ServerDistributionType distType)
415   {
416     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
417     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
418     CContext* context = CContext::getCurrent();
419
420     if (this->isChecked) return;
421     if (context->hasClient)
422     {
423       sendServerAttribut(globalDim, orderPositionInGrid, distType);
424       if (hasValue) sendValue();
425     }
426
427     this->isChecked = true;
428   }
429
430  void CAxis::sendValue()
431  {
432     if (n.getValue() == n_glo.getValue())
433       sendNonDistributedValue();
434     else
435       sendDistributedValue();
436  }
437
438  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
439                                     CServerDistributionDescription::ServerDistributionType distType)
440  {
441    CContext* context = CContext::getCurrent();
442    CContextClient* client = context->client;
443    int nbServer = client->serverSize;
444    int range, clientSize = client->clientSize;
445    int rank = client->clientRank;
446
447    size_t ni = this->n.getValue();
448    size_t ibegin = this->begin.getValue();
449    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
450    size_t nZoomCount = 0;
451    size_t nbIndex = index.numElements();
452    for (size_t idx = 0; idx < nbIndex; ++idx)
453    {
454      size_t globalIndex = index(idx);
455      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
456    }
457
458    CArray<size_t,1> globalIndexAxis(nbIndex);
459    std::vector<size_t> globalAxisZoom(nZoomCount);
460    nZoomCount = 0;
461    for (size_t idx = 0; idx < nbIndex; ++idx)
462    {
463      size_t globalIndex = index(idx);
464      globalIndexAxis(idx) = globalIndex;
465      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
466      {
467        globalAxisZoom[nZoomCount] = globalIndex;
468        ++nZoomCount;
469      }
470    }
471
472    std::set<int> writtenInd;
473    if (isCompressible_)
474    {
475      for (int idx = 0; idx < data_index.numElements(); ++idx)
476      {
477        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
478
479        if (ind >= 0 && ind < ni && mask(ind))
480        {
481          ind += ibegin;
482          if (ind >= global_zoom_begin && ind <= zoom_end)
483            writtenInd.insert(ind);
484        }
485      }
486    }
487
488    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
489    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
490    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
491    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
492    {
493      std::vector<int> nGlobAxis(1);
494      nGlobAxis[0] = n_glo.getValue();
495
496      size_t globalSizeIndex = 1, indexBegin, indexEnd;
497      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
498      indexBegin = 0;
499      if (globalSizeIndex <= clientSize)
500      {
501        indexBegin = rank%globalSizeIndex;
502        indexEnd = indexBegin;
503      }
504      else
505      {
506        for (int i = 0; i < clientSize; ++i)
507        {
508          range = globalSizeIndex / clientSize;
509          if (i < (globalSizeIndex%clientSize)) ++range;
510          if (i == client->clientRank) break;
511          indexBegin += range;
512        }
513        indexEnd = indexBegin + range - 1;
514      }
515
516      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
517      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
518      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
519      clientServerMap->computeServerIndexMapping(globalIndexAxis);
520      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
521      delete clientServerMap;
522    }
523    else
524    {
525      std::vector<size_t> globalIndexServer(n_glo.getValue());
526      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
527      {
528        globalIndexServer[idx] = idx;
529      }
530
531      for (int idx = 0; idx < nbServer; ++idx)
532      {
533        globalIndexAxisOnServer[idx] = globalIndexServer;
534      }
535    }
536
537    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
538                                                         ite = globalIndexAxisOnServer.end();
539    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
540                                        iteVec = (globalAxisZoom).end();
541    indSrv_.clear();
542    indWrittenSrv_.clear();
543    for (; it != ite; ++it)
544    {
545      int rank = it->first;
546      const std::vector<size_t>& globalIndexTmp = it->second;
547      int nb = globalIndexTmp.size();
548
549      for (int i = 0; i < nb; ++i)
550      {
551        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
552        {
553          indSrv_[rank].push_back(globalIndexTmp[i]);
554        }
555
556        if (writtenInd.count(globalIndexTmp[i]))
557        {
558          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
559        }
560      }
561    }
562
563    connectedServerRank_.clear();
564    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
565      connectedServerRank_.push_back(it->first);
566    }
567
568    if (!indSrv_.empty())
569    {
570      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
571                                                     iteIndSrv = indSrv_.end();
572      connectedServerRank_.clear();
573      for (; itIndSrv != iteIndSrv; ++itIndSrv)
574        connectedServerRank_.push_back(itIndSrv->first);
575    }
576    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
577  }
578
579  void CAxis::sendNonDistributedValue()
580  {
581    CContext* context = CContext::getCurrent();
582    CContextClient* client = context->client;
583    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
584
585    int zoom_end = global_zoom_begin + global_zoom_n - 1;
586    int nb = 0;
587    for (size_t idx = 0; idx < n; ++idx)
588    {
589      size_t globalIndex = begin + idx;
590      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
591    }
592
593    int nbWritten = 0;
594    if (isCompressible_)
595    {
596      for (int idx = 0; idx < data_index.numElements(); ++idx)
597      {
598        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
599
600        if (ind >= 0 && ind < n && mask(ind))
601        {
602          ind += begin;
603          if (ind >= global_zoom_begin && ind <= zoom_end)
604            ++nbWritten;
605        }
606      }
607    }
608
609    CArray<double,1> val(nb);
610    nb = 0;
611    for (size_t idx = 0; idx < n; ++idx)
612    {
613      size_t globalIndex = begin + idx;
614      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
615      {
616        val(nb) = value(idx);
617        ++nb;
618      }
619    }
620
621    CArray<int, 1> writtenInd(nbWritten);
622    nbWritten = 0;
623    if (isCompressible_)
624    {
625      for (int idx = 0; idx < data_index.numElements(); ++idx)
626      {
627        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
628
629        if (ind >= 0 && ind < n && mask(ind))
630        {
631          ind += begin;
632          if (ind >= global_zoom_begin && ind <= zoom_end)
633          {
634            writtenInd(nbWritten) = ind;
635            ++nbWritten;
636          }
637        }
638      }
639    }
640
641    if (client->isServerLeader())
642    {
643      std::list<CMessage> msgs;
644
645      const std::list<int>& ranks = client->getRanksServerLeader();
646      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
647      {
648        msgs.push_back(CMessage());
649        CMessage& msg = msgs.back();
650        msg << this->getId();
651        msg << val;
652        if (isCompressible_)
653          msg << writtenInd;
654        event.push(*itRank, 1, msg);
655      }
656      client->sendEvent(event);
657    }
658    else client->sendEvent(event);
659  }
660
661  void CAxis::sendDistributedValue(void)
662  {
663    int ns, n, i, j, ind, nv, idx;
664    CContext* context = CContext::getCurrent();
665    CContextClient* client=context->client;
666
667    // send value for each connected server
668    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
669    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
670
671    list<CMessage> list_msgsIndex, list_msgsVal;
672    list<CArray<int,1> > list_indi;
673    list<CArray<int,1> > list_writtenInd;
674    list<CArray<double,1> > list_val;
675    list<CArray<double,2> > list_bounds;
676    list<CArray<StdString,1> > list_label;
677
678    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
679    iteMap = indSrv_.end();
680    for (int k = 0; k < connectedServerRank_.size(); ++k)
681    {
682      int nbData = 0;
683      int rank = connectedServerRank_[k];
684      it = indSrv_.find(rank);
685      if (iteMap != it)
686        nbData = it->second.size();
687
688      list_indi.push_back(CArray<int,1>(nbData));
689      list_val.push_back(CArray<double,1>(nbData));
690
691      if (hasBounds_)
692      {
693        list_bounds.push_back(CArray<double,2>(2,nbData));
694      }
695     
696      if (hasLabel)
697      {
698        list_label.push_back(CArray<StdString,1>(nbData));
699      }
700
701      CArray<int,1>& indi = list_indi.back();
702      CArray<double,1>& val = list_val.back();
703
704      for (n = 0; n < nbData; ++n)
705      {
706        idx = static_cast<int>(it->second[n]);
707        ind = idx - begin;
708
709        val(n) = value(ind);
710        indi(n) = idx;
711
712        if (hasBounds_)
713        {
714          CArray<double,2>& boundsVal = list_bounds.back();
715          boundsVal(0, n) = bounds(0,n);
716          boundsVal(1, n) = bounds(1,n);
717        }
718       
719        if (hasLabel)
720        {
721          CArray<StdString,1>& labelVal = list_label.back();
722          labelVal(n) = label(n);
723        }
724      }
725
726      list_msgsIndex.push_back(CMessage());
727      list_msgsIndex.back() << this->getId() << list_indi.back();
728
729      if (isCompressible_)
730      {
731        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
732        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
733        CArray<int,1>& writtenInd = list_writtenInd.back();
734
735        for (n = 0; n < writtenInd.numElements(); ++n)
736          writtenInd(n) = writtenIndSrc[n];
737
738        list_msgsIndex.back() << writtenInd;
739      }
740
741      list_msgsVal.push_back(CMessage());
742      list_msgsVal.back() << this->getId() << list_val.back();
743
744      if (hasBounds_)
745      {
746        list_msgsVal.back() << list_bounds.back();
747      }
748 
749      if (hasLabel)
750      {
751        list_msgsVal.back() << list_label.back();
752      }
753
754      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
755      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
756    }
757
758    client->sendEvent(eventIndex);
759    client->sendEvent(eventVal);
760  }
761
762  void CAxis::recvIndex(CEventServer& event)
763  {
764    CAxis* axis;
765
766    list<CEventServer::SSubEvent>::iterator it;
767    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
768    {
769      CBufferIn* buffer = it->buffer;
770      string axisId;
771      *buffer >> axisId;
772      axis = get(axisId);
773      axis->recvIndex(it->rank, *buffer);
774    }
775
776    if (axis->isCompressible_)
777    {
778      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
779
780      CContextServer* server = CContext::getCurrent()->server;
781      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
782      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
783      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
784      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
785    }
786  }
787
788  void CAxis::recvIndex(int rank, CBufferIn& buffer)
789  {
790    buffer >> indiSrv_[rank];
791
792    if (isCompressible_)
793    {
794      CArray<int, 1> writtenIndexes;
795      buffer >> writtenIndexes;
796      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
797      for (int i = 0; i < writtenIndexes.numElements(); ++i)
798        indexesToWrite.push_back(writtenIndexes(i));
799    }
800  }
801
802  void CAxis::recvDistributedValue(CEventServer& event)
803  {
804    list<CEventServer::SSubEvent>::iterator it;
805    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
806    {
807      CBufferIn* buffer = it->buffer;
808      string axisId;
809      *buffer >> axisId;
810      get(axisId)->recvDistributedValue(it->rank, *buffer);
811    }
812  }
813
814  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
815  {
816    CArray<int,1> &indi = indiSrv_[rank];
817    CArray<double,1> val;
818    CArray<double,2> boundsVal;
819    CArray<StdString,1> labelVal;
820
821    buffer >> val;
822    if (hasBounds_) buffer >> boundsVal;
823    if (hasLabel) buffer >> labelVal;
824
825    int i, j, ind_srv;
826    for (int ind = 0; ind < indi.numElements(); ++ind)
827    {
828      i = indi(ind);
829      ind_srv = i - zoom_begin_srv;
830      value_srv(ind_srv) = val(ind);
831      if (hasBounds_)
832      {
833        bound_srv(0,ind_srv) = boundsVal(0, ind);
834        bound_srv(1,ind_srv) = boundsVal(1, ind);
835      }
836
837      if (hasLabel)
838      {
839        label_srv(ind_srv) = labelVal( ind);
840      }
841
842    }
843  }
844
845   void CAxis::recvNonDistributedValue(CEventServer& event)
846  {
847    CAxis* axis;
848
849    list<CEventServer::SSubEvent>::iterator it;
850    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
851    {
852      CBufferIn* buffer = it->buffer;
853      string axisId;
854      *buffer >> axisId;
855      axis = get(axisId);
856      axis->recvNonDistributedValue(it->rank, *buffer);
857    }
858
859    if (axis->isCompressible_)
860    {
861      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
862
863      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
864      axis->offsetWrittenIndexes_ = 0;
865    }
866  }
867
868  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
869  {
870    CArray<double,1> val;
871    buffer >> val;
872
873    for (int ind = 0; ind < val.numElements(); ++ind)
874    {
875      value_srv(ind) = val(ind);
876      if (hasBounds_)
877      {
878        bound_srv(0,ind) = bounds(0,ind);
879        bound_srv(1,ind) = bounds(1,ind);
880      }
881      if (hasLabel)
882      {
883        label_srv(ind) = label(ind);
884      }
885    }
886
887    if (isCompressible_)
888    {
889      CArray<int, 1> writtenIndexes;
890      buffer >> writtenIndexes;
891      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
892      for (int i = 0; i < writtenIndexes.numElements(); ++i)
893        indexesToWrite.push_back(writtenIndexes(i));
894    }
895  }
896
897  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
898                                 CServerDistributionDescription::ServerDistributionType distType)
899  {
900    CContext* context = CContext::getCurrent();
901    CContextClient* client = context->client;
902    int nbServer = client->serverSize;
903
904    CServerDistributionDescription serverDescription(globalDim, nbServer);
905    serverDescription.computeServerDistribution();
906
907    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
908    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
909
910    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
911    if (client->isServerLeader())
912    {
913      std::list<CMessage> msgs;
914
915      const std::list<int>& ranks = client->getRanksServerLeader();
916      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
917      {
918        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
919        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
920        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
921        const int end   = begin + ni - 1;
922
923        msgs.push_back(CMessage());
924        CMessage& msg = msgs.back();
925        msg << this->getId();
926        msg << ni << begin << end;
927        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
928        msg << isCompressible_;
929
930        event.push(*itRank,1,msg);
931      }
932      client->sendEvent(event);
933    }
934    else client->sendEvent(event);
935  }
936
937  void CAxis::recvServerAttribut(CEventServer& event)
938  {
939    CBufferIn* buffer = event.subEvents.begin()->buffer;
940    string axisId;
941    *buffer >> axisId;
942    get(axisId)->recvServerAttribut(*buffer);
943  }
944
945  void CAxis::recvServerAttribut(CBufferIn& buffer)
946  {
947    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
948
949    buffer >> ni_srv >> begin_srv >> end_srv;
950    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
951    buffer >> isCompressible_;
952    global_zoom_begin = global_zoom_begin_tmp;
953    global_zoom_n  = global_zoom_n_tmp;
954    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
955
956    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
957    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
958    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
959
960    if (zoom_size_srv<=0)
961    {
962      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
963    }
964
965    if (n_glo == n)
966    {
967      zoom_begin_srv = global_zoom_begin;
968      zoom_end_srv   = global_zoom_end; //zoom_end;
969      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
970    }
971    if (hasValue)
972    {
973      value_srv.resize(zoom_size_srv);
974      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
975      if (hasLabel)  label_srv.resize(zoom_size_srv);
976    }
977  }
978
979  /*!
980    Compare two axis objects.
981    They are equal if only if they have identical attributes as well as their values.
982    Moreover, they must have the same transformations.
983  \param [in] axis Compared axis
984  \return result of the comparison
985  */
986  bool CAxis::isEqual(CAxis* obj)
987  {
988    bool objEqual = SuperClass::isEqual(obj);
989    if (!objEqual) return objEqual;
990
991    TransMapTypes thisTrans = this->getAllTransformations();
992    TransMapTypes objTrans  = obj->getAllTransformations();
993
994    TransMapTypes::const_iterator it, itb, ite;
995    std::vector<ETranformationType> thisTransType, objTransType;
996    for (it = thisTrans.begin(); it != thisTrans.end(); ++it)
997      thisTransType.push_back(it->first);
998    for (it = objTrans.begin(); it != objTrans.end(); ++it)
999      objTransType.push_back(it->first);
1000
1001    if (thisTransType.size() != objTransType.size()) return false;
1002    for (int idx = 0; idx < thisTransType.size(); ++idx)
1003      objEqual &= (thisTransType[idx] == objTransType[idx]);
1004
1005    return objEqual;
1006  }
1007
1008  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
1009  {
1010    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
1011    return transformationMap_.back().second;
1012  }
1013
1014  bool CAxis::hasTransformation()
1015  {
1016    return (!transformationMap_.empty());
1017  }
1018
1019  void CAxis::setTransformations(const TransMapTypes& axisTrans)
1020  {
1021    transformationMap_ = axisTrans;
1022  }
1023
1024  CAxis::TransMapTypes CAxis::getAllTransformations(void)
1025  {
1026    return transformationMap_;
1027  }
1028
1029  void CAxis::duplicateTransformation(CAxis* src)
1030  {
1031    if (src->hasTransformation())
1032    {
1033      this->setTransformations(src->getAllTransformations());
1034    }
1035  }
1036
1037  /*!
1038   * Go through the hierarchy to find the axis from which the transformations must be inherited
1039   */
1040  void CAxis::solveInheritanceTransformation()
1041  {
1042    if (hasTransformation() || !hasDirectAxisReference())
1043      return;
1044
1045    CAxis* axis = this;
1046    std::vector<CAxis*> refAxis;
1047    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1048    {
1049      refAxis.push_back(axis);
1050      axis = axis->getDirectAxisReference();
1051    }
1052
1053    if (axis->hasTransformation())
1054      for (size_t i = 0; i < refAxis.size(); ++i)
1055        refAxis[i]->setTransformations(axis->getAllTransformations());
1056  }
1057
1058  void CAxis::parse(xml::CXMLNode & node)
1059  {
1060    SuperClass::parse(node);
1061
1062    if (node.goToChildElement())
1063    {
1064      StdString nodeElementName;
1065      do
1066      {
1067        StdString nodeId("");
1068        if (node.getAttributes().end() != node.getAttributes().find("id"))
1069        { nodeId = node.getAttributes()["id"]; }
1070
1071        nodeElementName = node.getElementName();
1072        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
1073        it = transformationMapList_.find(nodeElementName);
1074        if (ite != it)
1075        {
1076          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1077                                                                                                               nodeId,
1078                                                                                                               &node)));
1079        }
1080        else
1081        {
1082          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1083                << "The transformation " << nodeElementName << " has not been supported yet.");
1084        }
1085      } while (node.goToNextElement()) ;
1086      node.goToParentElement();
1087    }
1088  }
1089
1090  DEFINE_REF_FUNC(Axis,axis)
1091
1092   ///---------------------------------------------------------------
1093
1094} // namespace xios
Note: See TracBrowser for help on using the repository browser.