source: XIOS/dev/branch_openmp/src/node/axis.cpp @ 1338

Last change on this file since 1338 was 1334, checked in by yushan, 6 years ago

omp_dev

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 38.1 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false), hasLabel(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false), hasLabel(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   std::map<StdString, ETranformationType> *CAxis::transformationMapList_ptr = 0;
47
48   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
49   {
50     m["zoom_axis"] = TRANS_ZOOM_AXIS;
51     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
52     m["inverse_axis"] = TRANS_INVERSE_AXIS;
53     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
54     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
55   }
56
57   bool CAxis::initializeTransformationMap()
58   {
59     if(CAxis::transformationMapList_ptr == 0) CAxis::transformationMapList_ptr = new std::map<StdString, ETranformationType>();
60     (*CAxis::transformationMapList_ptr)["zoom_axis"]        = TRANS_ZOOM_AXIS;
61     (*CAxis::transformationMapList_ptr)["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
62     (*CAxis::transformationMapList_ptr)["inverse_axis"]     = TRANS_INVERSE_AXIS;
63     (*CAxis::transformationMapList_ptr)["reduce_domain"]    = TRANS_REDUCE_DOMAIN_TO_AXIS;
64     (*CAxis::transformationMapList_ptr)["extract_domain"]   = TRANS_EXTRACT_DOMAIN_TO_AXIS;
65   }
66
67   ///---------------------------------------------------------------
68
69   const std::set<StdString> & CAxis::getRelFiles(void) const
70   {
71      return (this->relFiles);
72   }
73
74   bool CAxis::IsWritten(const StdString & filename) const
75   {
76      return (this->relFiles.find(filename) != this->relFiles.end());
77   }
78
79   bool CAxis::isWrittenCompressed(const StdString& filename) const
80   {
81      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
82   }
83
84   bool CAxis::isDistributed(void) const
85   {
86      return isDistributed_;
87   }
88
89   /*!
90    * Test whether the data defined on the axis can be outputted in a compressed way.
91    *
92    * \return true if and only if a mask was defined for this axis
93    */
94   bool CAxis::isCompressible(void) const
95   {
96      return isCompressible_;
97   }
98
99   void CAxis::addRelFile(const StdString & filename)
100   {
101      this->relFiles.insert(filename);
102   }
103
104   void CAxis::addRelFileCompressed(const StdString& filename)
105   {
106      this->relFilesCompressed.insert(filename);
107   }
108
109   //----------------------------------------------------------------
110
111   const std::vector<int>& CAxis::getIndexesToWrite(void) const
112   {
113     return indexesToWrite;
114   }
115
116   /*!
117     Returns the number of indexes written by each server.
118     \return the number of indexes written by each server
119   */
120   int CAxis::getNumberWrittenIndexes() const
121   {
122     return numberWrittenIndexes_;
123   }
124
125   /*!
126     Returns the total number of indexes written by the servers.
127     \return the total number of indexes written by the servers
128   */
129   int CAxis::getTotalNumberWrittenIndexes() const
130   {
131     return totalNumberWrittenIndexes_;
132   }
133
134   /*!
135     Returns the offset of indexes written by each server.
136     \return the offset of indexes written by each server
137   */
138   int CAxis::getOffsetWrittenIndexes() const
139   {
140     return offsetWrittenIndexes_;
141   }
142
143   //----------------------------------------------------------------
144
145   /*!
146    * Compute the minimum buffer size required to send the attributes to the server(s).
147    *
148    * \return A map associating the server rank with its minimum buffer size.
149    */
150   std::map<int, StdSize> CAxis::getAttributesBufferSize()
151   {
152     CContextClient* client = CContext::getCurrent()->client;
153
154     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
155
156     bool isNonDistributed = (n == n_glo);
157
158     if (client->isServerLeader())
159     {
160       // size estimation for sendServerAttribut
161       size_t size = 6 * sizeof(size_t);
162       // size estimation for sendNonDistributedValue
163       if (isNonDistributed)
164         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
165       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
166
167       const std::list<int>& ranks = client->getRanksServerLeader();
168       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
169       {
170         if (size > attributesSizes[*itRank])
171           attributesSizes[*itRank] = size;
172       }
173     }
174
175     if (!isNonDistributed)
176     {
177       // size estimation for sendDistributedValue
178       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
179       for (it = indSrv_.begin(); it != ite; ++it)
180       {
181         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
182         if (isCompressible_)
183           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
184
185         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
186         if (hasBounds_)
187           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
188 
189         if (hasLabel)
190           sizeValEvent += CArray<StdString,1>::size(it->second.size());
191
192         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
193         if (size > attributesSizes[it->first])
194           attributesSizes[it->first] = size;
195       }
196     }
197
198     return attributesSizes;
199   }
200
201   //----------------------------------------------------------------
202
203   StdString CAxis::GetName(void)   { return (StdString("axis")); }
204   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
205   ENodeType CAxis::GetType(void)   { return (eAxis); }
206
207   //----------------------------------------------------------------
208
209   CAxis* CAxis::createAxis()
210   {
211     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
212     return axis;
213   }
214
215   void CAxis::fillInValues(const CArray<double,1>& values)
216   {
217     this->value = values;
218   }
219
220   void CAxis::checkAttributes(void)
221   {
222      if (this->n_glo.isEmpty())
223        ERROR("CAxis::checkAttributes(void)",
224              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
225              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
226      StdSize size = this->n_glo.getValue();
227
228      if (!this->index.isEmpty())
229      {
230        if (n.isEmpty()) n = index.numElements();
231
232        // It's not so correct but if begin is not the first value of index
233        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
234        if (begin.isEmpty()) begin = index(0);         
235      }
236      else 
237      {
238        if (!this->begin.isEmpty())
239        {
240          if (begin < 0 || begin > size - 1)
241            ERROR("CAxis::checkAttributes(void)",
242                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
243                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
244        }
245        else this->begin.setValue(0);
246
247        if (!this->n.isEmpty())
248        {
249          if (n < 0 || n > size)
250            ERROR("CAxis::checkAttributes(void)",
251                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
252                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
253        }
254        else this->n.setValue(size);
255
256        {
257          index.resize(n);
258          for (int i = 0; i < n; ++i) index(i) = i+begin;
259        }
260      }
261
262      if (!this->value.isEmpty())
263      {
264        StdSize true_size = value.numElements();
265        if (this->n.getValue() != true_size)
266          ERROR("CAxis::checkAttributes(void)",
267                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
268                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
269        this->hasValue = true;
270      }
271
272      this->checkData();
273      this->checkZoom();
274      this->checkMask();
275      this->checkBounds();
276      this->checkLabel();
277
278      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
279                       (!this->n.isEmpty() && (this->n != this->n_glo));
280
281      // A same stupid condition to make sure that if there is only one client, axis
282      // should be considered to be distributed. This should be a temporary solution     
283      isDistributed_ |= (1 == CContext::getCurrent()->client->clientSize);
284   }
285
286   void CAxis::checkData()
287   {
288      if (data_begin.isEmpty()) data_begin.setValue(0);
289
290      if (data_n.isEmpty())
291      {
292        data_n.setValue(n);
293      }
294      else if (data_n.getValue() < 0)
295      {
296        ERROR("CAxis::checkData(void)",
297              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
298              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
299      }
300
301      if (data_index.isEmpty())
302      {
303        data_index.resize(data_n);
304        for (int i = 0; i < data_n; ++i) data_index(i) = i;
305      }
306   }
307
308   void CAxis::checkZoom(void)
309   {
310     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
311     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
312   }
313
314   void CAxis::checkMask()
315   {
316      if (!mask.isEmpty())
317      {
318         if (mask.extent(0) != n)
319           ERROR("CAxis::checkMask(void)",
320                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
321                 << "The mask does not have the same size as the local domain." << std::endl
322                 << "Local size is " << n.getValue() << "." << std::endl
323                 << "Mask size is " << mask.extent(0) << ".");
324      }
325      else // (mask.isEmpty())
326      { // If no mask was defined, we create a default one without any masked point.
327         mask.resize(n);
328         for (int i = 0; i < n; ++i)
329         {
330           mask(i) = true;
331         }
332      }
333   }
334
335  void CAxis::checkBounds()
336  {
337    if (!bounds.isEmpty())
338    {
339      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
340        ERROR("CAxis::checkAttributes(void)",
341              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
342              << "Axis size is " << n.getValue() << "." << std::endl
343              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
344      hasBounds_ = true;
345    }
346    else hasBounds_ = false;
347  }
348
349  void CAxis::checkLabel()
350  {
351    if (!label.isEmpty())
352    {
353      if (label.extent(0) != n)
354        ERROR("CAxis::checkLabel(void)",
355              << "The label array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension of axis size." << std::endl
356              << "Axis size is " << n.getValue() << "." << std::endl
357              << "label size is "<< label.extent(0)<<  " .");
358      hasLabel = true;
359    }
360    else hasLabel = false;
361  }
362  void CAxis::checkEligibilityForCompressedOutput()
363  {
364    // We don't check if the mask is valid here, just if a mask has been defined at this point.
365    isCompressible_ = !mask.isEmpty();
366  }
367
368  bool CAxis::zoomByIndex()
369  {
370    return (!global_zoom_index.isEmpty() && (0 != global_zoom_index.numElements()));
371  }
372
373  bool CAxis::dispatchEvent(CEventServer& event)
374   {
375      if (SuperClass::dispatchEvent(event)) return true;
376      else
377      {
378        switch(event.type)
379        {
380           case EVENT_ID_SERVER_ATTRIBUT :
381             recvServerAttribut(event);
382             return true;
383             break;
384           case EVENT_ID_INDEX:
385            recvIndex(event);
386            return true;
387            break;
388          case EVENT_ID_DISTRIBUTED_VALUE:
389            recvDistributedValue(event);
390            return true;
391            break;
392          case EVENT_ID_NON_DISTRIBUTED_VALUE:
393            recvNonDistributedValue(event);
394            return true;
395            break;
396           default :
397             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
398                    << "Unknown Event");
399           return false;
400         }
401      }
402   }
403
404   void CAxis::checkAttributesOnClient()
405   {
406     if (this->areClientAttributesChecked_) return;
407
408     this->checkAttributes();
409
410     this->areClientAttributesChecked_ = true;
411   }
412
413   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
414                                                          CServerDistributionDescription::ServerDistributionType distType)
415   {
416     CContext* context=CContext::getCurrent() ;
417
418     if (this->isClientAfterTransformationChecked) return;
419     if (context->hasClient)
420     {
421       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
422     }
423
424     this->isClientAfterTransformationChecked = true;
425   }
426
427   // Send all checked attributes to server
428   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
429                                     CServerDistributionDescription::ServerDistributionType distType)
430   {
431     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
432     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
433     CContext* context = CContext::getCurrent();
434
435     if (this->isChecked) return;
436     if (context->hasClient)
437     {
438       sendServerAttribut(globalDim, orderPositionInGrid, distType);
439       if (hasValue) sendValue();
440     }
441
442     this->isChecked = true;
443   }
444
445  void CAxis::sendValue()
446  {
447     if (n.getValue() == n_glo.getValue())
448       sendNonDistributedValue();
449     else
450       sendDistributedValue();
451  }
452
453  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
454                                     CServerDistributionDescription::ServerDistributionType distType)
455  {
456    CContext* context = CContext::getCurrent();
457    CContextClient* client = context->client;
458    int nbServer = client->serverSize;
459    int range, clientSize = client->clientSize;
460    int rank = client->clientRank;
461
462    size_t ni = this->n.getValue();
463    size_t ibegin = this->begin.getValue();
464    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
465    size_t nZoomCount = 0;
466    size_t nbIndex = index.numElements();
467
468    int end = (0 == n) ? begin : begin + n - 1;
469    int zoom_size = zoomByIndex() ? global_zoom_index.numElements() : global_zoom_n;
470    int minInd = min(index);
471    int maxInd = max(index);
472    for (size_t idx = 0; idx < zoom_size; ++idx)
473    {
474      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
475      if (globalZoomIndex >= minInd && globalZoomIndex <= maxInd) ++nZoomCount;
476    }
477
478/*    for (size_t idx = 0; idx < nbIndex; ++idx)
479    {
480      size_t globalIndex = index(idx);
481      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
482    }*/
483   
484    CArray<size_t,1> globalIndexAxis(nbIndex);
485    for (size_t idx = 0; idx < nbIndex; ++idx)
486    {     
487      globalIndexAxis(idx) = (size_t)index(idx);
488    }
489
490    std::vector<size_t> globalAxisZoom(nZoomCount);
491    nZoomCount = 0;
492    for (size_t idx = 0; idx < zoom_size; ++idx)
493    {
494      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
495      if (globalZoomIndex >= minInd && globalZoomIndex <= maxInd)
496      {
497        globalAxisZoom[nZoomCount] = globalZoomIndex;
498        ++nZoomCount;
499      }
500    }
501
502    std::set<int> writtenInd;
503    if (isCompressible_)
504    {
505      for (int idx = 0; idx < data_index.numElements(); ++idx)
506      {
507        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
508
509        if (ind >= 0 && ind < ni && mask(ind))
510        {
511          ind += ibegin;
512          if (ind >= global_zoom_begin && ind <= zoom_end)
513            writtenInd.insert(ind);
514        }
515      }
516    }
517
518    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
519    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
520    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
521    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
522    {
523      std::vector<int> nGlobAxis(1);
524      nGlobAxis[0] = n_glo.getValue();
525
526      size_t globalSizeIndex = 1, indexBegin, indexEnd;
527      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
528      indexBegin = 0;
529      if (globalSizeIndex <= clientSize)
530      {
531        indexBegin = rank%globalSizeIndex;
532        indexEnd = indexBegin;
533      }
534      else
535      {
536        for (int i = 0; i < clientSize; ++i)
537        {
538          range = globalSizeIndex / clientSize;
539          if (i < (globalSizeIndex%clientSize)) ++range;
540          if (i == client->clientRank) break;
541          indexBegin += range;
542        }
543        indexEnd = indexBegin + range - 1;
544      }
545
546      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
547      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
548      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
549      clientServerMap->computeServerIndexMapping(globalIndexAxis);
550      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
551      delete clientServerMap;
552    }
553    else
554    {
555      std::vector<size_t> globalIndexServer(n_glo.getValue());
556      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
557      {
558        globalIndexServer[idx] = idx;
559      }
560
561      for (int idx = 0; idx < nbServer; ++idx)
562      {
563        globalIndexAxisOnServer[idx] = globalIndexServer;
564      }
565    }
566
567    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
568                                                         ite = globalIndexAxisOnServer.end();
569    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
570                                        iteVec = (globalAxisZoom).end();
571    indSrv_.clear();
572    indWrittenSrv_.clear();
573    for (; it != ite; ++it)
574    {
575      int rank = it->first;
576      const std::vector<size_t>& globalIndexTmp = it->second;
577      int nb = globalIndexTmp.size();
578
579      for (int i = 0; i < nb; ++i)
580      {
581        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
582        {
583          indSrv_[rank].push_back(globalIndexTmp[i]);
584        }
585
586        if (writtenInd.count(globalIndexTmp[i]))
587        {
588          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
589        }
590      }
591    }
592
593    connectedServerRank_.clear();
594    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
595      connectedServerRank_.push_back(it->first);
596    }
597
598    if (!indSrv_.empty())
599    {
600      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
601                                                     iteIndSrv = indSrv_.end();
602      connectedServerRank_.clear();
603      for (; itIndSrv != iteIndSrv; ++itIndSrv)
604        connectedServerRank_.push_back(itIndSrv->first);
605    }
606    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
607  }
608
609  void CAxis::sendNonDistributedValue()
610  {
611    CContext* context = CContext::getCurrent();
612    CContextClient* client = context->client;
613    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
614
615    int zoom_end = global_zoom_begin + global_zoom_n - 1;
616    int nb = 0;
617/*    for (size_t idx = 0; idx < n; ++idx)
618    {
619      size_t globalIndex = begin + idx;
620      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
621    }*/
622
623    int end = (0 == n) ? begin : begin + n - 1;
624    int zoom_size = zoomByIndex() ? global_zoom_index.numElements() : global_zoom_n;
625    for (size_t idx = 0; idx < zoom_size; ++idx)
626    {
627      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
628      if (globalZoomIndex >= begin && globalZoomIndex <= end) ++nb;
629    }
630
631    int nbWritten = 0;
632    if (isCompressible_)
633    {
634      for (int idx = 0; idx < data_index.numElements(); ++idx)
635      {
636        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
637
638        if (ind >= 0 && ind < n && mask(ind))
639        {
640          ind += begin;
641          if (ind >= global_zoom_begin && ind <= zoom_end)
642            ++nbWritten;
643        }
644      }
645    }
646
647    CArray<double,1> val(nb);
648    nb = 0;
649/*    for (size_t idx = 0; idx < n; ++idx)
650    {
651      size_t globalIndex = begin + idx;
652      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
653      {
654        val(nb) = value(idx);
655        ++nb;
656      }
657    }*/
658
659    for (size_t idx = 0; idx < zoom_size; ++idx)
660    {
661      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
662      if (globalZoomIndex >= begin && globalZoomIndex <= end)
663      {
664        val(nb) = value(globalZoomIndex-begin);
665        ++nb;
666      }
667    }
668
669    CArray<int, 1> writtenInd(nbWritten);
670    nbWritten = 0;
671    if (isCompressible_)
672    {
673      for (int idx = 0; idx < data_index.numElements(); ++idx)
674      {
675        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
676
677        if (ind >= 0 && ind < n && mask(ind))
678        {
679          ind += begin;
680          if (ind >= global_zoom_begin && ind <= zoom_end)
681          {
682            writtenInd(nbWritten) = ind;
683            ++nbWritten;
684          }
685        }
686      }
687    }
688
689    if (client->isServerLeader())
690    {
691      std::list<CMessage> msgs;
692
693      const std::list<int>& ranks = client->getRanksServerLeader();
694      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
695      {
696        msgs.push_back(CMessage());
697        CMessage& msg = msgs.back();
698        msg << this->getId();
699        msg << val;
700        if (isCompressible_)
701          msg << writtenInd;
702        event.push(*itRank, 1, msg);
703      }
704      client->sendEvent(event);
705    }
706    else client->sendEvent(event);
707  }
708
709  void CAxis::sendDistributedValue(void)
710  {
711    int ns, n, i, j, ind, nv, idx;
712    CContext* context = CContext::getCurrent();
713    CContextClient* client=context->client;
714
715    // send value for each connected server
716    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
717    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
718
719    list<CMessage> list_msgsIndex, list_msgsVal;
720    list<CArray<int,1> > list_indi;
721    list<CArray<int,1> > list_writtenInd;
722    list<CArray<double,1> > list_val;
723    list<CArray<double,2> > list_bounds;
724    list<CArray<StdString,1> > list_label;
725
726    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
727    iteMap = indSrv_.end();
728    for (int k = 0; k < connectedServerRank_.size(); ++k)
729    {
730      int nbData = 0;
731      int rank = connectedServerRank_[k];
732      it = indSrv_.find(rank);
733      if (iteMap != it)
734        nbData = it->second.size();
735
736      list_indi.push_back(CArray<int,1>(nbData));
737      list_val.push_back(CArray<double,1>(nbData));
738
739      if (hasBounds_)
740      {
741        list_bounds.push_back(CArray<double,2>(2,nbData));
742      }
743     
744      if (hasLabel)
745      {
746        list_label.push_back(CArray<StdString,1>(nbData));
747      }
748
749      CArray<int,1>& indi = list_indi.back();
750      CArray<double,1>& val = list_val.back();
751
752      for (n = 0; n < nbData; ++n)
753      {
754        idx = static_cast<int>(it->second[n]);
755        ind = idx - begin;
756
757        val(n) = value(ind);
758        indi(n) = idx;
759
760        if (hasBounds_)
761        {
762          CArray<double,2>& boundsVal = list_bounds.back();
763          boundsVal(0, n) = bounds(0,n);
764          boundsVal(1, n) = bounds(1,n);
765        }
766       
767        if (hasLabel)
768        {
769          CArray<StdString,1>& labelVal = list_label.back();
770          labelVal(n) = label(n);
771        }
772      }
773
774      list_msgsIndex.push_back(CMessage());
775      list_msgsIndex.back() << this->getId() << list_indi.back();
776
777      if (isCompressible_)
778      {
779        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
780        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
781        CArray<int,1>& writtenInd = list_writtenInd.back();
782
783        for (n = 0; n < writtenInd.numElements(); ++n)
784          writtenInd(n) = writtenIndSrc[n];
785
786        list_msgsIndex.back() << writtenInd;
787      }
788
789      list_msgsVal.push_back(CMessage());
790      list_msgsVal.back() << this->getId() << list_val.back();
791
792      if (hasBounds_)
793      {
794        list_msgsVal.back() << list_bounds.back();
795      }
796 
797      if (hasLabel)
798      {
799        list_msgsVal.back() << list_label.back();
800      }
801
802      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
803      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
804    }
805
806    client->sendEvent(eventIndex);
807    client->sendEvent(eventVal);
808  }
809
810  void CAxis::recvIndex(CEventServer& event)
811  {
812    CAxis* axis;
813
814    list<CEventServer::SSubEvent>::iterator it;
815    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
816    {
817      CBufferIn* buffer = it->buffer;
818      string axisId;
819      *buffer >> axisId;
820      axis = get(axisId);
821      axis->recvIndex(it->rank, *buffer);
822    }
823
824    if (axis->isCompressible_)
825    {
826      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
827
828      CContextServer* server = CContext::getCurrent()->server;
829      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
830      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
831      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
832      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
833    }
834  }
835
836  void CAxis::recvIndex(int rank, CBufferIn& buffer)
837  {
838    buffer >> indiSrv_[rank];
839
840    if (isCompressible_)
841    {
842      CArray<int, 1> writtenIndexes;
843      buffer >> writtenIndexes;
844      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
845      for (int i = 0; i < writtenIndexes.numElements(); ++i)
846        indexesToWrite.push_back(writtenIndexes(i));
847    }
848  }
849
850  void CAxis::recvDistributedValue(CEventServer& event)
851  {
852    list<CEventServer::SSubEvent>::iterator it;
853    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
854    {
855      CBufferIn* buffer = it->buffer;
856      string axisId;
857      *buffer >> axisId;
858      get(axisId)->recvDistributedValue(it->rank, *buffer);
859    }
860  }
861
862  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
863  {
864    CArray<int,1> &indi = indiSrv_[rank];
865    CArray<double,1> val;
866    CArray<double,2> boundsVal;
867    CArray<StdString,1> labelVal;
868
869    buffer >> val;
870    if (hasBounds_) buffer >> boundsVal;
871    if (hasLabel) buffer >> labelVal;
872
873    int i, j, ind_srv;
874    for (int ind = 0; ind < indi.numElements(); ++ind)
875    {
876      i = indi(ind);
877      ind_srv = i - zoom_begin_srv;
878      value_srv(ind_srv) = val(ind);
879      if (hasBounds_)
880      {
881        bound_srv(0,ind_srv) = boundsVal(0, ind);
882        bound_srv(1,ind_srv) = boundsVal(1, ind);
883      }
884
885      if (hasLabel)
886      {
887        label_srv(ind_srv) = labelVal( ind);
888      }
889
890    }
891  }
892
893   void CAxis::recvNonDistributedValue(CEventServer& event)
894  {
895    CAxis* axis;
896
897    list<CEventServer::SSubEvent>::iterator it;
898    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
899    {
900      CBufferIn* buffer = it->buffer;
901      string axisId;
902      *buffer >> axisId;
903      axis = get(axisId);
904      axis->recvNonDistributedValue(it->rank, *buffer);
905    }
906
907    if (axis->isCompressible_)
908    {
909      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
910
911      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
912      axis->offsetWrittenIndexes_ = 0;
913    }
914  }
915
916  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
917  {
918    CArray<double,1> val;
919    buffer >> val;
920
921    for (int ind = 0; ind < val.numElements(); ++ind)
922    {
923      value_srv(ind) = val(ind);
924      if (hasBounds_)
925      {
926        bound_srv(0,ind) = bounds(0,ind);
927        bound_srv(1,ind) = bounds(1,ind);
928      }
929      if (hasLabel)
930      {
931        label_srv(ind) = label(ind);
932      }
933    }
934
935    if (isCompressible_)
936    {
937      CArray<int, 1> writtenIndexes;
938      buffer >> writtenIndexes;
939      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
940      for (int i = 0; i < writtenIndexes.numElements(); ++i)
941        indexesToWrite.push_back(writtenIndexes(i));
942    }
943  }
944
945  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
946                                 CServerDistributionDescription::ServerDistributionType distType)
947  {
948    CContext* context = CContext::getCurrent();
949    CContextClient* client = context->client;
950    int nbServer = client->serverSize;
951
952    CServerDistributionDescription serverDescription(globalDim, nbServer);
953    serverDescription.computeServerDistribution();
954
955    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
956    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
957
958    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
959    if (client->isServerLeader())
960    {
961      std::list<CMessage> msgs;
962
963      const std::list<int>& ranks = client->getRanksServerLeader();
964      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
965      {
966        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
967        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
968        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
969        const int end   = begin + ni - 1;
970        const bool zoomIndex = zoomByIndex();
971
972        msgs.push_back(CMessage());
973        CMessage& msg = msgs.back();
974        msg << this->getId();
975        msg << ni << begin << end;
976        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
977        msg << isCompressible_;
978        msg << zoomIndex;
979        if (zoomIndex)
980          msg << global_zoom_index.getValue();
981
982        event.push(*itRank,1,msg);
983      }
984      client->sendEvent(event);
985    }
986    else client->sendEvent(event);
987  }
988
989  void CAxis::recvServerAttribut(CEventServer& event)
990  {
991    CBufferIn* buffer = event.subEvents.begin()->buffer;
992    string axisId;
993    *buffer >> axisId;
994    get(axisId)->recvServerAttribut(*buffer);
995  }
996
997  void CAxis::recvServerAttribut(CBufferIn& buffer)
998  {
999    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
1000    bool zoomIndex;   
1001    CArray<int,1> zoom_index_recv;
1002    std::vector<int> zoom_index_tmp;
1003    std::vector<int>::iterator itZoomBeginSrv, itZoomEndSrv, itZoomSrv;
1004
1005    buffer >> ni_srv >> begin_srv >> end_srv;
1006    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
1007    buffer >> isCompressible_;
1008    buffer >> zoomIndex;
1009    if (zoomIndex)
1010    {
1011      buffer >> zoom_index_recv;
1012      global_zoom_index.reference(zoom_index_recv);
1013      zoom_index_tmp.resize(global_zoom_index.numElements());
1014      std::copy(global_zoom_index.begin(), global_zoom_index.end(), zoom_index_tmp.begin());
1015      std::sort(zoom_index_tmp.begin(), zoom_index_tmp.end());
1016      itZoomBeginSrv = std::lower_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), begin_srv);
1017      itZoomEndSrv   = std::upper_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), end_srv);     
1018      int sz = std::distance(itZoomBeginSrv, itZoomEndSrv);
1019      zoom_index_srv.resize(sz);
1020      itZoomSrv = itZoomBeginSrv;
1021      for (int i = 0; i < sz; ++i, ++itZoomSrv)
1022      {
1023        zoom_index_srv(i) = *(itZoomSrv);
1024      }
1025    }
1026
1027    global_zoom_begin = global_zoom_begin_tmp;
1028    global_zoom_n  = global_zoom_n_tmp;
1029    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
1030
1031    zoom_begin_srv = zoomIndex ? std::distance(itZoomBeginSrv, zoom_index_tmp.begin())
1032                                 : global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
1033    zoom_end_srv   = zoomIndex ? std::distance(zoom_index_tmp.begin(), itZoomEndSrv) - 1 
1034                                 : global_zoom_end < end_srv ? global_zoom_end : end_srv ;
1035    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
1036     
1037    global_zoom_begin_srv = zoomIndex ? 0 : global_zoom_begin ;
1038    global_zoom_size_srv  = zoomIndex ? zoom_index_tmp.size() : global_zoom_n;
1039
1040    if (zoom_size_srv<=0)
1041    {
1042      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
1043    }
1044
1045    if (n_glo == n)
1046    {
1047      zoom_begin_srv = zoomIndex ? std::distance(itZoomBeginSrv, zoom_index_tmp.begin())
1048                                   : global_zoom_begin;     
1049      zoom_size_srv  = zoomIndex ? zoom_index_tmp.size()
1050                                   : global_zoom_n;
1051    }
1052    if (hasValue)
1053    {
1054      value_srv.resize(zoom_size_srv);
1055      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
1056      if (hasLabel)  label_srv.resize(zoom_size_srv);
1057    }
1058  }
1059
1060  /*!
1061    Compare two axis objects.
1062    They are equal if only if they have identical attributes as well as their values.
1063    Moreover, they must have the same transformations.
1064  \param [in] axis Compared axis
1065  \return result of the comparison
1066  */
1067  bool CAxis::isEqual(CAxis* obj)
1068  {
1069    vector<StdString> excludedAttr;
1070    excludedAttr.push_back("axis_ref");
1071
1072    bool objEqual = SuperClass::isEqual(obj, excludedAttr);   
1073    if (!objEqual) return objEqual;
1074
1075    TransMapTypes thisTrans = this->getAllTransformations();
1076    TransMapTypes objTrans  = obj->getAllTransformations();
1077
1078    TransMapTypes::const_iterator it, itb, ite;
1079    std::vector<ETranformationType> thisTransType, objTransType;
1080    for (it = thisTrans.begin(); it != thisTrans.end(); ++it)
1081      thisTransType.push_back(it->first);
1082    for (it = objTrans.begin(); it != objTrans.end(); ++it)
1083      objTransType.push_back(it->first);
1084
1085    if (thisTransType.size() != objTransType.size()) return false;
1086    for (int idx = 0; idx < thisTransType.size(); ++idx)
1087      objEqual &= (thisTransType[idx] == objTransType[idx]);
1088
1089    return objEqual;
1090  }
1091
1092  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
1093  {
1094    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
1095    return transformationMap_.back().second;
1096  }
1097
1098  bool CAxis::hasTransformation()
1099  {
1100    return (!transformationMap_.empty());
1101  }
1102
1103  void CAxis::setTransformations(const TransMapTypes& axisTrans)
1104  {
1105    transformationMap_ = axisTrans;
1106  }
1107
1108  CAxis::TransMapTypes CAxis::getAllTransformations(void)
1109  {
1110    return transformationMap_;
1111  }
1112
1113  void CAxis::duplicateTransformation(CAxis* src)
1114  {
1115    if (src->hasTransformation())
1116    {
1117      this->setTransformations(src->getAllTransformations());
1118    }
1119  }
1120
1121  /*!
1122   * Go through the hierarchy to find the axis from which the transformations must be inherited
1123   */
1124  void CAxis::solveInheritanceTransformation()
1125  {
1126    if (hasTransformation() || !hasDirectAxisReference())
1127      return;
1128
1129    CAxis* axis = this;
1130    std::vector<CAxis*> refAxis;
1131    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1132    {
1133      refAxis.push_back(axis);
1134      axis = axis->getDirectAxisReference();
1135    }
1136
1137    if (axis->hasTransformation())
1138      for (size_t i = 0; i < refAxis.size(); ++i)
1139        refAxis[i]->setTransformations(axis->getAllTransformations());
1140  }
1141
1142  void CAxis::parse(xml::CXMLNode & node)
1143  {
1144    SuperClass::parse(node);
1145
1146    if (node.goToChildElement())
1147    {
1148      StdString nodeElementName;
1149      do
1150      {
1151        StdString nodeId("");
1152        if (node.getAttributes().end() != node.getAttributes().find("id"))
1153        { nodeId = node.getAttributes()["id"]; }
1154
1155        nodeElementName = node.getElementName();
1156
1157        if(transformationMapList_ptr == 0) initializeTransformationMap();
1158        std::map<StdString, ETranformationType>::const_iterator ite = (*CAxis::transformationMapList_ptr).end(), it;
1159        it = (*CAxis::transformationMapList_ptr).find(nodeElementName);
1160        if (ite != it)
1161        {
1162          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1163                                                                                                               nodeId,
1164                                                                                                               &node)));
1165        }
1166        else
1167        {
1168          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1169                << "The transformation " << nodeElementName << " has not been supported yet.");
1170        }
1171      } while (node.goToNextElement()) ;
1172      node.goToParentElement();
1173    }
1174  }
1175
1176  DEFINE_REF_FUNC(Axis,axis)
1177
1178   ///---------------------------------------------------------------
1179
1180} // namespace xios
Note: See TracBrowser for help on using the repository browser.