source: XIOS/trunk/src/node/axis.cpp @ 1050

Last change on this file since 1050 was 1050, checked in by ymipsl, 7 years ago
  • Add label attribute for axis
  • if label attribute is present, then only only string label for each level will be output in netcdf file, ie value and bounds will not be output.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 34.0 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false), hasLabel(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false), hasLabel(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
47   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
48   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
49   {
50     m["zoom_axis"] = TRANS_ZOOM_AXIS;
51     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
52     m["inverse_axis"] = TRANS_INVERSE_AXIS;
53     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
54     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
55   }
56
57   ///---------------------------------------------------------------
58
59   const std::set<StdString> & CAxis::getRelFiles(void) const
60   {
61      return (this->relFiles);
62   }
63
64   bool CAxis::IsWritten(const StdString & filename) const
65   {
66      return (this->relFiles.find(filename) != this->relFiles.end());
67   }
68
69   bool CAxis::isWrittenCompressed(const StdString& filename) const
70   {
71      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
72   }
73
74   bool CAxis::isDistributed(void) const
75   {
76      return isDistributed_;
77   }
78
79   /*!
80    * Test whether the data defined on the axis can be outputted in a compressed way.
81    *
82    * \return true if and only if a mask was defined for this axis
83    */
84   bool CAxis::isCompressible(void) const
85   {
86      return isCompressible_;
87   }
88
89   void CAxis::addRelFile(const StdString & filename)
90   {
91      this->relFiles.insert(filename);
92   }
93
94   void CAxis::addRelFileCompressed(const StdString& filename)
95   {
96      this->relFilesCompressed.insert(filename);
97   }
98
99   //----------------------------------------------------------------
100
101   const std::vector<int>& CAxis::getIndexesToWrite(void) const
102   {
103     return indexesToWrite;
104   }
105
106   /*!
107     Returns the number of indexes written by each server.
108     \return the number of indexes written by each server
109   */
110   int CAxis::getNumberWrittenIndexes() const
111   {
112     return numberWrittenIndexes_;
113   }
114
115   /*!
116     Returns the total number of indexes written by the servers.
117     \return the total number of indexes written by the servers
118   */
119   int CAxis::getTotalNumberWrittenIndexes() const
120   {
121     return totalNumberWrittenIndexes_;
122   }
123
124   /*!
125     Returns the offset of indexes written by each server.
126     \return the offset of indexes written by each server
127   */
128   int CAxis::getOffsetWrittenIndexes() const
129   {
130     return offsetWrittenIndexes_;
131   }
132
133   //----------------------------------------------------------------
134
135   /*!
136    * Compute the minimum buffer size required to send the attributes to the server(s).
137    *
138    * \return A map associating the server rank with its minimum buffer size.
139    */
140   std::map<int, StdSize> CAxis::getAttributesBufferSize()
141   {
142     CContextClient* client = CContext::getCurrent()->client;
143
144     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
145
146     bool isNonDistributed = (n == n_glo);
147
148     if (client->isServerLeader())
149     {
150       // size estimation for sendServerAttribut
151       size_t size = 6 * sizeof(size_t);
152       // size estimation for sendNonDistributedValue
153       if (isNonDistributed)
154         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
155       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
156
157       const std::list<int>& ranks = client->getRanksServerLeader();
158       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
159       {
160         if (size > attributesSizes[*itRank])
161           attributesSizes[*itRank] = size;
162       }
163     }
164
165     if (!isNonDistributed)
166     {
167       // size estimation for sendDistributedValue
168       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
169       for (it = indSrv_.begin(); it != ite; ++it)
170       {
171         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
172         if (isCompressible_)
173           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
174
175         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
176         if (hasBounds_)
177           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
178 
179         if (hasLabel)
180           sizeValEvent += CArray<StdString,1>::size(it->second.size());
181
182         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
183         if (size > attributesSizes[it->first])
184           attributesSizes[it->first] = size;
185       }
186     }
187
188     return attributesSizes;
189   }
190
191   //----------------------------------------------------------------
192
193   StdString CAxis::GetName(void)   { return (StdString("axis")); }
194   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
195   ENodeType CAxis::GetType(void)   { return (eAxis); }
196
197   //----------------------------------------------------------------
198
199   CAxis* CAxis::createAxis()
200   {
201     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
202     return axis;
203   }
204
205   void CAxis::fillInValues(const CArray<double,1>& values)
206   {
207     this->value = values;
208   }
209
210   void CAxis::checkAttributes(void)
211   {
212      if (this->n_glo.isEmpty())
213        ERROR("CAxis::checkAttributes(void)",
214              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
215              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
216      StdSize size = this->n_glo.getValue();
217
218      if (!this->index.isEmpty())
219      {
220        if (n.isEmpty()) n = index.numElements();
221
222        // It's not so correct but if begin is not the first value of index
223        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
224        if (begin.isEmpty()) begin = index(0);         
225      }
226      else 
227      {
228        if (!this->begin.isEmpty())
229        {
230          if (begin < 0 || begin > size - 1)
231            ERROR("CAxis::checkAttributes(void)",
232                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
233                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
234        }
235        else this->begin.setValue(0);
236
237        if (!this->n.isEmpty())
238        {
239          if (n < 0 || n > size)
240            ERROR("CAxis::checkAttributes(void)",
241                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
242                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
243        }
244        else this->n.setValue(size);
245
246        {
247          index.resize(n);
248          for (int i = 0; i < n; ++i) index(i) = i+begin;
249        }
250      }
251
252      if (!this->value.isEmpty())
253      {
254        StdSize true_size = value.numElements();
255        if (this->n.getValue() != true_size)
256          ERROR("CAxis::checkAttributes(void)",
257                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
258                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
259        this->hasValue = true;
260      }
261
262      this->checkData();
263      this->checkZoom();
264      this->checkMask();
265      this->checkBounds();
266      this->checkLabel();
267
268      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
269                       (!this->n.isEmpty() && (this->n != this->n_glo));
270
271      // A same stupid condition to make sure that if there is only one client, axis
272      // should be considered to be distributed. This should be a temporary solution     
273      isDistributed_ |= (1 == CContext::getCurrent()->client->clientSize);
274   }
275
276   void CAxis::checkData()
277   {
278      if (data_begin.isEmpty()) data_begin.setValue(0);
279
280      if (data_n.isEmpty())
281      {
282        data_n.setValue(n);
283      }
284      else if (data_n.getValue() < 0)
285      {
286        ERROR("CAxis::checkData(void)",
287              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
288              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
289      }
290
291      if (data_index.isEmpty())
292      {
293        data_index.resize(data_n);
294        for (int i = 0; i < data_n; ++i) data_index(i) = i;
295      }
296   }
297
298   void CAxis::checkZoom(void)
299   {
300     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
301     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
302   }
303
304   void CAxis::checkMask()
305   {
306      if (!mask.isEmpty())
307      {
308         if (mask.extent(0) != n)
309           ERROR("CAxis::checkMask(void)",
310                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
311                 << "The mask does not have the same size as the local domain." << std::endl
312                 << "Local size is " << n.getValue() << "." << std::endl
313                 << "Mask size is " << mask.extent(0) << ".");
314      }
315      else // (mask.isEmpty())
316      { // If no mask was defined, we create a default one without any masked point.
317         mask.resize(n);
318         for (int i = 0; i < n; ++i)
319         {
320           mask(i) = true;
321         }
322      }
323   }
324
325  void CAxis::checkBounds()
326  {
327    if (!bounds.isEmpty())
328    {
329      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
330        ERROR("CAxis::checkAttributes(void)",
331              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
332              << "Axis size is " << n.getValue() << "." << std::endl
333              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
334      hasBounds_ = true;
335    }
336    else hasBounds_ = false;
337  }
338
339  void CAxis::checkLabel()
340  {
341    if (!label.isEmpty())
342    {
343      if (label.extent(0) != n)
344        ERROR("CAxis::checkLabel(void)",
345              << "The label array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension of axis size." << std::endl
346              << "Axis size is " << n.getValue() << "." << std::endl
347              << "label size is "<< label.extent(0)<<  " .");
348      hasLabel = true;
349    }
350    else hasLabel = false;
351  }
352  void CAxis::checkEligibilityForCompressedOutput()
353  {
354    // We don't check if the mask is valid here, just if a mask has been defined at this point.
355    isCompressible_ = !mask.isEmpty();
356  }
357
358  bool CAxis::dispatchEvent(CEventServer& event)
359   {
360      if (SuperClass::dispatchEvent(event)) return true;
361      else
362      {
363        switch(event.type)
364        {
365           case EVENT_ID_SERVER_ATTRIBUT :
366             recvServerAttribut(event);
367             return true;
368             break;
369           case EVENT_ID_INDEX:
370            recvIndex(event);
371            return true;
372            break;
373          case EVENT_ID_DISTRIBUTED_VALUE:
374            recvDistributedValue(event);
375            return true;
376            break;
377          case EVENT_ID_NON_DISTRIBUTED_VALUE:
378            recvNonDistributedValue(event);
379            return true;
380            break;
381           default :
382             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
383                    << "Unknown Event");
384           return false;
385         }
386      }
387   }
388
389   void CAxis::checkAttributesOnClient()
390   {
391     if (this->areClientAttributesChecked_) return;
392
393     this->checkAttributes();
394
395     this->areClientAttributesChecked_ = true;
396   }
397
398   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
399                                                          CServerDistributionDescription::ServerDistributionType distType)
400   {
401     CContext* context=CContext::getCurrent() ;
402
403     if (this->isClientAfterTransformationChecked) return;
404     if (context->hasClient)
405     {
406       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
407     }
408
409     this->isClientAfterTransformationChecked = true;
410   }
411
412   // Send all checked attributes to server
413   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
414                                     CServerDistributionDescription::ServerDistributionType distType)
415   {
416     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
417     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
418     CContext* context = CContext::getCurrent();
419
420     if (this->isChecked) return;
421     if (context->hasClient)
422     {
423       sendServerAttribut(globalDim, orderPositionInGrid, distType);
424       if (hasValue) sendValue();
425     }
426
427     this->isChecked = true;
428   }
429
430  void CAxis::sendValue()
431  {
432     if (n.getValue() == n_glo.getValue())
433       sendNonDistributedValue();
434     else
435       sendDistributedValue();
436  }
437
438  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
439                                     CServerDistributionDescription::ServerDistributionType distType)
440  {
441    CContext* context = CContext::getCurrent();
442    CContextClient* client = context->client;
443    int nbServer = client->serverSize;
444    int range, clientSize = client->clientSize;
445    int rank = client->clientRank;
446
447    size_t ni = this->n.getValue();
448    size_t ibegin = this->begin.getValue();
449    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
450    size_t nZoomCount = 0;
451    size_t nbIndex = index.numElements();
452    for (size_t idx = 0; idx < nbIndex; ++idx)
453    {
454      size_t globalIndex = index(idx);
455      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
456    }
457
458    CArray<size_t,1> globalIndexAxis(nbIndex);
459    std::vector<size_t> globalAxisZoom(nZoomCount);
460    nZoomCount = 0;
461    for (size_t idx = 0; idx < nbIndex; ++idx)
462    {
463      size_t globalIndex = index(idx);
464      globalIndexAxis(idx) = globalIndex;
465      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
466      {
467        globalAxisZoom[nZoomCount] = globalIndex;
468        ++nZoomCount;
469      }
470    }
471
472    std::set<int> writtenInd;
473    if (isCompressible_)
474    {
475      for (int idx = 0; idx < data_index.numElements(); ++idx)
476      {
477        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
478
479        if (ind >= 0 && ind < ni && mask(ind))
480        {
481          ind += ibegin;
482          if (ind >= global_zoom_begin && ind <= zoom_end)
483            writtenInd.insert(ind);
484        }
485      }
486    }
487
488    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
489    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
490    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
491    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
492    {
493      std::vector<int> nGlobAxis(1);
494      nGlobAxis[0] = n_glo.getValue();
495
496      size_t globalSizeIndex = 1, indexBegin, indexEnd;
497      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
498      indexBegin = 0;
499      if (globalSizeIndex <= clientSize)
500      {
501        indexBegin = rank%globalSizeIndex;
502        indexEnd = indexBegin;
503      }
504      else
505      {
506        for (int i = 0; i < clientSize; ++i)
507        {
508          range = globalSizeIndex / clientSize;
509          if (i < (globalSizeIndex%clientSize)) ++range;
510          if (i == client->clientRank) break;
511          indexBegin += range;
512        }
513        indexEnd = indexBegin + range - 1;
514      }
515
516      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
517      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
518      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
519      clientServerMap->computeServerIndexMapping(globalIndexAxis);
520      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
521      delete clientServerMap;
522    }
523    else
524    {
525      std::vector<size_t> globalIndexServer(n_glo.getValue());
526      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
527      {
528        globalIndexServer[idx] = idx;
529      }
530
531      for (int idx = 0; idx < nbServer; ++idx)
532      {
533        globalIndexAxisOnServer[idx] = globalIndexServer;
534      }
535    }
536
537    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
538                                                         ite = globalIndexAxisOnServer.end();
539    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
540                                        iteVec = (globalAxisZoom).end();
541    indSrv_.clear();
542    indWrittenSrv_.clear();
543    for (; it != ite; ++it)
544    {
545      int rank = it->first;
546      const std::vector<size_t>& globalIndexTmp = it->second;
547      int nb = globalIndexTmp.size();
548
549      for (int i = 0; i < nb; ++i)
550      {
551        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
552        {
553          indSrv_[rank].push_back(globalIndexTmp[i]);
554        }
555
556        if (writtenInd.count(globalIndexTmp[i]))
557        {
558          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
559        }
560      }
561    }
562
563    connectedServerRank_.clear();
564    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
565      connectedServerRank_.push_back(it->first);
566    }
567
568    if (!indSrv_.empty())
569    {
570      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
571                                                     iteIndSrv = indSrv_.end();
572      connectedServerRank_.clear();
573      for (; itIndSrv != iteIndSrv; ++itIndSrv)
574        connectedServerRank_.push_back(itIndSrv->first);
575    }
576    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
577  }
578
579  void CAxis::sendNonDistributedValue()
580  {
581    CContext* context = CContext::getCurrent();
582    CContextClient* client = context->client;
583    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
584
585    int zoom_end = global_zoom_begin + global_zoom_n - 1;
586    int nb = 0;
587    for (size_t idx = 0; idx < n; ++idx)
588    {
589      size_t globalIndex = begin + idx;
590      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
591    }
592
593    int nbWritten = 0;
594    if (isCompressible_)
595    {
596      for (int idx = 0; idx < data_index.numElements(); ++idx)
597      {
598        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
599
600        if (ind >= 0 && ind < n && mask(ind))
601        {
602          ind += begin;
603          if (ind >= global_zoom_begin && ind <= zoom_end)
604            ++nbWritten;
605        }
606      }
607    }
608
609    CArray<double,1> val(nb);
610    nb = 0;
611    for (size_t idx = 0; idx < n; ++idx)
612    {
613      size_t globalIndex = begin + idx;
614      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
615      {
616        val(nb) = value(idx);
617        ++nb;
618      }
619    }
620
621    CArray<int, 1> writtenInd(nbWritten);
622    nbWritten = 0;
623    if (isCompressible_)
624    {
625      for (int idx = 0; idx < data_index.numElements(); ++idx)
626      {
627        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
628
629        if (ind >= 0 && ind < n && mask(ind))
630        {
631          ind += begin;
632          if (ind >= global_zoom_begin && ind <= zoom_end)
633          {
634            writtenInd(nbWritten) = ind;
635            ++nbWritten;
636          }
637        }
638      }
639    }
640
641    if (client->isServerLeader())
642    {
643      std::list<CMessage> msgs;
644
645      const std::list<int>& ranks = client->getRanksServerLeader();
646      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
647      {
648        msgs.push_back(CMessage());
649        CMessage& msg = msgs.back();
650        msg << this->getId();
651        msg << val;
652        if (isCompressible_)
653          msg << writtenInd;
654        event.push(*itRank, 1, msg);
655      }
656      client->sendEvent(event);
657    }
658    else client->sendEvent(event);
659  }
660
661  void CAxis::sendDistributedValue(void)
662  {
663    int ns, n, i, j, ind, nv, idx;
664    CContext* context = CContext::getCurrent();
665    CContextClient* client=context->client;
666
667    // send value for each connected server
668    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
669    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
670
671    list<CMessage> list_msgsIndex, list_msgsVal;
672    list<CArray<int,1> > list_indi;
673    list<CArray<int,1> > list_writtenInd;
674    list<CArray<double,1> > list_val;
675    list<CArray<double,2> > list_bounds;
676    list<CArray<StdString,1> > list_label;
677
678    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
679    iteMap = indSrv_.end();
680    for (int k = 0; k < connectedServerRank_.size(); ++k)
681    {
682      int nbData = 0;
683      int rank = connectedServerRank_[k];
684      it = indSrv_.find(rank);
685      if (iteMap != it)
686        nbData = it->second.size();
687
688      list_indi.push_back(CArray<int,1>(nbData));
689      list_val.push_back(CArray<double,1>(nbData));
690
691      if (hasBounds_)
692      {
693        list_bounds.push_back(CArray<double,2>(2,nbData));
694      }
695     
696      if (hasLabel)
697      {
698        list_label.push_back(CArray<StdString,1>(nbData));
699      }
700
701      CArray<int,1>& indi = list_indi.back();
702      CArray<double,1>& val = list_val.back();
703
704      for (n = 0; n < nbData; ++n)
705      {
706        idx = static_cast<int>(it->second[n]);
707        ind = idx - begin;
708
709        val(n) = value(ind);
710        indi(n) = idx;
711
712        if (hasBounds_)
713        {
714          CArray<double,2>& boundsVal = list_bounds.back();
715          boundsVal(0, n) = bounds(0,n);
716          boundsVal(1, n) = bounds(1,n);
717        }
718       
719        if (hasLabel)
720        {
721          CArray<StdString,1>& labelVal = list_label.back();
722          labelVal(n) = label(n);
723        }
724      }
725
726      list_msgsIndex.push_back(CMessage());
727      list_msgsIndex.back() << this->getId() << list_indi.back();
728
729      if (isCompressible_)
730      {
731        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
732        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
733        CArray<int,1>& writtenInd = list_writtenInd.back();
734
735        for (n = 0; n < writtenInd.numElements(); ++n)
736          writtenInd(n) = writtenIndSrc[n];
737
738        list_msgsIndex.back() << writtenInd;
739      }
740
741      list_msgsVal.push_back(CMessage());
742      list_msgsVal.back() << this->getId() << list_val.back();
743
744      if (hasBounds_)
745      {
746        list_msgsVal.back() << list_bounds.back();
747      }
748 
749      if (hasLabel)
750      {
751        list_msgsVal.back() << list_label.back();
752      }
753
754      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
755      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
756    }
757
758    client->sendEvent(eventIndex);
759    client->sendEvent(eventVal);
760  }
761
762  void CAxis::recvIndex(CEventServer& event)
763  {
764    CAxis* axis;
765
766    list<CEventServer::SSubEvent>::iterator it;
767    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
768    {
769      CBufferIn* buffer = it->buffer;
770      string axisId;
771      *buffer >> axisId;
772      axis = get(axisId);
773      axis->recvIndex(it->rank, *buffer);
774    }
775
776    if (axis->isCompressible_)
777    {
778      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
779
780      CContextServer* server = CContext::getCurrent()->server;
781      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
782      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
783      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
784      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
785    }
786  }
787
788  void CAxis::recvIndex(int rank, CBufferIn& buffer)
789  {
790    buffer >> indiSrv_[rank];
791
792    if (isCompressible_)
793    {
794      CArray<int, 1> writtenIndexes;
795      buffer >> writtenIndexes;
796      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
797      for (int i = 0; i < writtenIndexes.numElements(); ++i)
798        indexesToWrite.push_back(writtenIndexes(i));
799    }
800  }
801
802  void CAxis::recvDistributedValue(CEventServer& event)
803  {
804    list<CEventServer::SSubEvent>::iterator it;
805    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
806    {
807      CBufferIn* buffer = it->buffer;
808      string axisId;
809      *buffer >> axisId;
810      get(axisId)->recvDistributedValue(it->rank, *buffer);
811    }
812  }
813
814  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
815  {
816    CArray<int,1> &indi = indiSrv_[rank];
817    CArray<double,1> val;
818    CArray<double,2> boundsVal;
819    CArray<StdString,1> labelVal;
820
821    buffer >> val;
822    if (hasBounds_) buffer >> boundsVal;
823    if (hasLabel) buffer >> labelVal;
824
825    int i, j, ind_srv;
826    for (int ind = 0; ind < indi.numElements(); ++ind)
827    {
828      i = indi(ind);
829      ind_srv = i - zoom_begin_srv;
830      value_srv(ind_srv) = val(ind);
831      if (hasBounds_)
832      {
833        bound_srv(0,ind_srv) = boundsVal(0, ind);
834        bound_srv(1,ind_srv) = boundsVal(1, ind);
835      }
836
837      if (hasLabel)
838      {
839        label_srv(ind_srv) = labelVal( ind);
840      }
841
842    }
843  }
844
845   void CAxis::recvNonDistributedValue(CEventServer& event)
846  {
847    CAxis* axis;
848
849    list<CEventServer::SSubEvent>::iterator it;
850    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
851    {
852      CBufferIn* buffer = it->buffer;
853      string axisId;
854      *buffer >> axisId;
855      axis = get(axisId);
856      axis->recvNonDistributedValue(it->rank, *buffer);
857    }
858
859    if (axis->isCompressible_)
860    {
861      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
862
863      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
864      axis->offsetWrittenIndexes_ = 0;
865    }
866  }
867
868  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
869  {
870    CArray<double,1> val;
871    buffer >> val;
872
873    for (int ind = 0; ind < val.numElements(); ++ind)
874    {
875      value_srv(ind) = val(ind);
876      if (hasBounds_)
877      {
878        bound_srv(0,ind) = bounds(0,ind);
879        bound_srv(1,ind) = bounds(1,ind);
880      }
881      if (hasLabel)
882      {
883        label_srv(ind) = label(ind);
884      }
885    }
886
887    if (isCompressible_)
888    {
889      CArray<int, 1> writtenIndexes;
890      buffer >> writtenIndexes;
891      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
892      for (int i = 0; i < writtenIndexes.numElements(); ++i)
893        indexesToWrite.push_back(writtenIndexes(i));
894    }
895  }
896
897  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
898                                 CServerDistributionDescription::ServerDistributionType distType)
899  {
900    CContext* context = CContext::getCurrent();
901    CContextClient* client = context->client;
902    int nbServer = client->serverSize;
903
904    CServerDistributionDescription serverDescription(globalDim, nbServer);
905    serverDescription.computeServerDistribution();
906
907    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
908    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
909
910    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
911    if (client->isServerLeader())
912    {
913      std::list<CMessage> msgs;
914
915      const std::list<int>& ranks = client->getRanksServerLeader();
916      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
917      {
918        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
919        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
920        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
921        const int end   = begin + ni - 1;
922
923        msgs.push_back(CMessage());
924        CMessage& msg = msgs.back();
925        msg << this->getId();
926        msg << ni << begin << end;
927        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
928        msg << isCompressible_;
929
930        event.push(*itRank,1,msg);
931      }
932      client->sendEvent(event);
933    }
934    else client->sendEvent(event);
935  }
936
937  void CAxis::recvServerAttribut(CEventServer& event)
938  {
939    CBufferIn* buffer = event.subEvents.begin()->buffer;
940    string axisId;
941    *buffer >> axisId;
942    get(axisId)->recvServerAttribut(*buffer);
943  }
944
945  void CAxis::recvServerAttribut(CBufferIn& buffer)
946  {
947    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
948
949    buffer >> ni_srv >> begin_srv >> end_srv;
950    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
951    buffer >> isCompressible_;
952    global_zoom_begin = global_zoom_begin_tmp;
953    global_zoom_n  = global_zoom_n_tmp;
954    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
955
956    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
957    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
958    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
959
960    if (zoom_size_srv<=0)
961    {
962      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
963    }
964
965    if (n_glo == n)
966    {
967      zoom_begin_srv = global_zoom_begin;
968      zoom_end_srv   = global_zoom_end; //zoom_end;
969      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
970    }
971    if (hasValue)
972    {
973      value_srv.resize(zoom_size_srv);
974      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
975      if (hasLabel)  label_srv.resize(zoom_size_srv);
976    }
977  }
978
979  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
980  {
981    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
982    return transformationMap_.back().second;
983  }
984
985  bool CAxis::hasTransformation()
986  {
987    return (!transformationMap_.empty());
988  }
989
990  void CAxis::setTransformations(const TransMapTypes& axisTrans)
991  {
992    transformationMap_ = axisTrans;
993  }
994
995  CAxis::TransMapTypes CAxis::getAllTransformations(void)
996  {
997    return transformationMap_;
998  }
999
1000  /*!
1001    Check the validity of all transformations applied on axis
1002  This functions is called AFTER all inherited attributes are solved
1003  */
1004  void CAxis::checkTransformations()
1005  {
1006    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
1007                                  ite = transformationMap_.end();
1008//    for (it = itb; it != ite; ++it)
1009//    {
1010//      (it->second)->checkValid(this);
1011//    }
1012  }
1013
1014  void CAxis::duplicateTransformation(CAxis* src)
1015  {
1016    if (src->hasTransformation())
1017    {
1018      this->setTransformations(src->getAllTransformations());
1019    }
1020  }
1021
1022  /*!
1023   * Go through the hierarchy to find the axis from which the transformations must be inherited
1024   */
1025  void CAxis::solveInheritanceTransformation()
1026  {
1027    if (hasTransformation() || !hasDirectAxisReference())
1028      return;
1029
1030    CAxis* axis = this;
1031    std::vector<CAxis*> refAxis;
1032    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1033    {
1034      refAxis.push_back(axis);
1035      axis = axis->getDirectAxisReference();
1036    }
1037
1038    if (axis->hasTransformation())
1039      for (size_t i = 0; i < refAxis.size(); ++i)
1040        refAxis[i]->setTransformations(axis->getAllTransformations());
1041  }
1042
1043  void CAxis::parse(xml::CXMLNode & node)
1044  {
1045    SuperClass::parse(node);
1046
1047    if (node.goToChildElement())
1048    {
1049      StdString nodeElementName;
1050      do
1051      {
1052        StdString nodeId("");
1053        if (node.getAttributes().end() != node.getAttributes().find("id"))
1054        { nodeId = node.getAttributes()["id"]; }
1055
1056        nodeElementName = node.getElementName();
1057        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
1058        it = transformationMapList_.find(nodeElementName);
1059        if (ite != it)
1060        {
1061          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1062                                                                                                               nodeId,
1063                                                                                                               &node)));
1064        }
1065        else
1066        {
1067          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1068                << "The transformation " << nodeElementName << " has not been supported yet.");
1069        }
1070      } while (node.goToNextElement()) ;
1071      node.goToParentElement();
1072    }
1073  }
1074
1075  DEFINE_REF_FUNC(Axis,axis)
1076
1077   ///---------------------------------------------------------------
1078
1079} // namespace xios
Note: See TracBrowser for help on using the repository browser.