source: XIOS/dev/branch_openmp/src/node/axis.cpp @ 1328

Last change on this file since 1328 was 1328, checked in by yushan, 6 years ago

dev_omp

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 38.5 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false), hasLabel(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false), hasLabel(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   //std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
47   std::map<StdString, ETranformationType> *CAxis::transformationMapList_ptr = 0;
48   //bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
49   
50   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
51   {
52     m["zoom_axis"] = TRANS_ZOOM_AXIS;
53     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
54     m["inverse_axis"] = TRANS_INVERSE_AXIS;
55     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
56     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
57   }
58
59   bool CAxis::initializeTransformationMap()
60   {
61     if(CAxis::transformationMapList_ptr == 0) CAxis::transformationMapList_ptr = new std::map<StdString, ETranformationType>();
62     (*CAxis::transformationMapList_ptr)["zoom_axis"]        = TRANS_ZOOM_AXIS;
63     (*CAxis::transformationMapList_ptr)["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
64     (*CAxis::transformationMapList_ptr)["inverse_axis"]     = TRANS_INVERSE_AXIS;
65     (*CAxis::transformationMapList_ptr)["reduce_domain"]    = TRANS_REDUCE_DOMAIN_TO_AXIS;
66     (*CAxis::transformationMapList_ptr)["extract_domain"]   = TRANS_EXTRACT_DOMAIN_TO_AXIS;
67   }
68
69   ///---------------------------------------------------------------
70
71   const std::set<StdString> & CAxis::getRelFiles(void) const
72   {
73      return (this->relFiles);
74   }
75
76   bool CAxis::IsWritten(const StdString & filename) const
77   {
78      return (this->relFiles.find(filename) != this->relFiles.end());
79   }
80
81   bool CAxis::isWrittenCompressed(const StdString& filename) const
82   {
83      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
84   }
85
86   bool CAxis::isDistributed(void) const
87   {
88      return isDistributed_;
89   }
90
91   /*!
92    * Test whether the data defined on the axis can be outputted in a compressed way.
93    *
94    * \return true if and only if a mask was defined for this axis
95    */
96   bool CAxis::isCompressible(void) const
97   {
98      return isCompressible_;
99   }
100
101   void CAxis::addRelFile(const StdString & filename)
102   {
103      this->relFiles.insert(filename);
104   }
105
106   void CAxis::addRelFileCompressed(const StdString& filename)
107   {
108      this->relFilesCompressed.insert(filename);
109   }
110
111   //----------------------------------------------------------------
112
113   const std::vector<int>& CAxis::getIndexesToWrite(void) const
114   {
115     return indexesToWrite;
116   }
117
118   /*!
119     Returns the number of indexes written by each server.
120     \return the number of indexes written by each server
121   */
122   int CAxis::getNumberWrittenIndexes() const
123   {
124     return numberWrittenIndexes_;
125   }
126
127   /*!
128     Returns the total number of indexes written by the servers.
129     \return the total number of indexes written by the servers
130   */
131   int CAxis::getTotalNumberWrittenIndexes() const
132   {
133     return totalNumberWrittenIndexes_;
134   }
135
136   /*!
137     Returns the offset of indexes written by each server.
138     \return the offset of indexes written by each server
139   */
140   int CAxis::getOffsetWrittenIndexes() const
141   {
142     return offsetWrittenIndexes_;
143   }
144
145   //----------------------------------------------------------------
146
147   /*!
148    * Compute the minimum buffer size required to send the attributes to the server(s).
149    *
150    * \return A map associating the server rank with its minimum buffer size.
151    */
152   std::map<int, StdSize> CAxis::getAttributesBufferSize()
153   {
154     CContextClient* client = CContext::getCurrent()->client;
155
156     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
157
158     bool isNonDistributed = (n == n_glo);
159
160     if (client->isServerLeader())
161     {
162       // size estimation for sendServerAttribut
163       size_t size = 6 * sizeof(size_t);
164       // size estimation for sendNonDistributedValue
165       if (isNonDistributed)
166         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
167       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
168
169       const std::list<int>& ranks = client->getRanksServerLeader();
170       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
171       {
172         if (size > attributesSizes[*itRank])
173           attributesSizes[*itRank] = size;
174       }
175     }
176
177     if (!isNonDistributed)
178     {
179       // size estimation for sendDistributedValue
180       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
181       for (it = indSrv_.begin(); it != ite; ++it)
182       {
183         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
184         if (isCompressible_)
185           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
186
187         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
188         if (hasBounds_)
189           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
190 
191         if (hasLabel)
192           sizeValEvent += CArray<StdString,1>::size(it->second.size());
193
194         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
195         if (size > attributesSizes[it->first])
196           attributesSizes[it->first] = size;
197       }
198     }
199
200     return attributesSizes;
201   }
202
203   //----------------------------------------------------------------
204
205   StdString CAxis::GetName(void)   { return (StdString("axis")); }
206   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
207   ENodeType CAxis::GetType(void)   { return (eAxis); }
208
209   //----------------------------------------------------------------
210
211   CAxis* CAxis::createAxis()
212   {
213     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
214     return axis;
215   }
216
217   void CAxis::fillInValues(const CArray<double,1>& values)
218   {
219     this->value = values;
220   }
221
222   void CAxis::checkAttributes(void)
223   {
224      if (this->n_glo.isEmpty())
225        ERROR("CAxis::checkAttributes(void)",
226              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
227              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
228      StdSize size = this->n_glo.getValue();
229
230      if (!this->index.isEmpty())
231      {
232        if (n.isEmpty()) n = index.numElements();
233
234        // It's not so correct but if begin is not the first value of index
235        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
236        if (begin.isEmpty()) begin = index(0);         
237      }
238      else 
239      {
240        if (!this->begin.isEmpty())
241        {
242          if (begin < 0 || begin > size - 1)
243            ERROR("CAxis::checkAttributes(void)",
244                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
245                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
246        }
247        else this->begin.setValue(0);
248
249        if (!this->n.isEmpty())
250        {
251          if (n < 0 || n > size)
252            ERROR("CAxis::checkAttributes(void)",
253                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
254                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
255        }
256        else this->n.setValue(size);
257
258        {
259          index.resize(n);
260          for (int i = 0; i < n; ++i) index(i) = i+begin;
261        }
262      }
263
264      if (!this->value.isEmpty())
265      {
266        StdSize true_size = value.numElements();
267        if (this->n.getValue() != true_size)
268          ERROR("CAxis::checkAttributes(void)",
269                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
270                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
271        this->hasValue = true;
272      }
273
274      this->checkData();
275      this->checkZoom();
276      this->checkMask();
277      this->checkBounds();
278      this->checkLabel();
279
280      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
281                       (!this->n.isEmpty() && (this->n != this->n_glo));
282
283      // A same stupid condition to make sure that if there is only one client, axis
284      // should be considered to be distributed. This should be a temporary solution     
285      isDistributed_ |= (1 == CContext::getCurrent()->client->clientSize);
286   }
287
288   void CAxis::checkData()
289   {
290      if (data_begin.isEmpty()) data_begin.setValue(0);
291
292      if (data_n.isEmpty())
293      {
294        data_n.setValue(n);
295      }
296      else if (data_n.getValue() < 0)
297      {
298        ERROR("CAxis::checkData(void)",
299              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
300              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
301      }
302
303      if (data_index.isEmpty())
304      {
305        data_index.resize(data_n);
306        for (int i = 0; i < data_n; ++i) data_index(i) = i;
307      }
308   }
309
310   void CAxis::checkZoom(void)
311   {
312     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
313     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
314   }
315
316   void CAxis::checkMask()
317   {
318      if (!mask.isEmpty())
319      {
320         if (mask.extent(0) != n)
321           ERROR("CAxis::checkMask(void)",
322                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
323                 << "The mask does not have the same size as the local domain." << std::endl
324                 << "Local size is " << n.getValue() << "." << std::endl
325                 << "Mask size is " << mask.extent(0) << ".");
326      }
327      else // (mask.isEmpty())
328      { // If no mask was defined, we create a default one without any masked point.
329         mask.resize(n);
330         for (int i = 0; i < n; ++i)
331         {
332           mask(i) = true;
333         }
334      }
335   }
336
337  void CAxis::checkBounds()
338  {
339    if (!bounds.isEmpty())
340    {
341      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
342        ERROR("CAxis::checkAttributes(void)",
343              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
344              << "Axis size is " << n.getValue() << "." << std::endl
345              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
346      hasBounds_ = true;
347    }
348    else hasBounds_ = false;
349  }
350
351  void CAxis::checkLabel()
352  {
353    if (!label.isEmpty())
354    {
355      if (label.extent(0) != n)
356        ERROR("CAxis::checkLabel(void)",
357              << "The label array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension of axis size." << std::endl
358              << "Axis size is " << n.getValue() << "." << std::endl
359              << "label size is "<< label.extent(0)<<  " .");
360      hasLabel = true;
361    }
362    else hasLabel = false;
363  }
364  void CAxis::checkEligibilityForCompressedOutput()
365  {
366    // We don't check if the mask is valid here, just if a mask has been defined at this point.
367    isCompressible_ = !mask.isEmpty();
368  }
369
370  bool CAxis::zoomByIndex()
371  {
372    return (!global_zoom_index.isEmpty() && (0 != global_zoom_index.numElements()));
373  }
374
375  bool CAxis::dispatchEvent(CEventServer& event)
376   {
377      if (SuperClass::dispatchEvent(event)) return true;
378      else
379      {
380        switch(event.type)
381        {
382           case EVENT_ID_SERVER_ATTRIBUT :
383             recvServerAttribut(event);
384             return true;
385             break;
386           case EVENT_ID_INDEX:
387            recvIndex(event);
388            return true;
389            break;
390          case EVENT_ID_DISTRIBUTED_VALUE:
391            recvDistributedValue(event);
392            return true;
393            break;
394          case EVENT_ID_NON_DISTRIBUTED_VALUE:
395            recvNonDistributedValue(event);
396            return true;
397            break;
398           default :
399             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
400                    << "Unknown Event");
401           return false;
402         }
403      }
404   }
405
406   void CAxis::checkAttributesOnClient()
407   {
408     if (this->areClientAttributesChecked_) return;
409
410     this->checkAttributes();
411
412     this->areClientAttributesChecked_ = true;
413   }
414
415   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
416                                                          CServerDistributionDescription::ServerDistributionType distType)
417   {
418     CContext* context=CContext::getCurrent() ;
419
420     if (this->isClientAfterTransformationChecked) return;
421     if (context->hasClient)
422     {
423       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
424     }
425
426     this->isClientAfterTransformationChecked = true;
427   }
428
429   // Send all checked attributes to server
430   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
431                                     CServerDistributionDescription::ServerDistributionType distType)
432   {
433     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
434     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
435     CContext* context = CContext::getCurrent();
436
437     if (this->isChecked) return;
438     if (context->hasClient)
439     {
440       sendServerAttribut(globalDim, orderPositionInGrid, distType);
441       if (hasValue) sendValue();
442     }
443
444     this->isChecked = true;
445   }
446
447  void CAxis::sendValue()
448  {
449     if (n.getValue() == n_glo.getValue())
450       sendNonDistributedValue();
451     else
452       sendDistributedValue();
453  }
454
455  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
456                                     CServerDistributionDescription::ServerDistributionType distType)
457  {
458    CContext* context = CContext::getCurrent();
459    CContextClient* client = context->client;
460    int nbServer = client->serverSize;
461    int range, clientSize = client->clientSize;
462    int rank = client->clientRank;
463
464    size_t ni = this->n.getValue();
465    size_t ibegin = this->begin.getValue();
466    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
467    size_t nZoomCount = 0;
468    size_t nbIndex = index.numElements();
469
470    int end = (0 == n) ? begin : begin + n - 1;
471    int zoom_size = zoomByIndex() ? global_zoom_index.numElements() : global_zoom_n;
472    int minInd = min(index);
473    int maxInd = max(index);
474    for (size_t idx = 0; idx < zoom_size; ++idx)
475    {
476      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
477      if (globalZoomIndex >= minInd && globalZoomIndex <= maxInd) ++nZoomCount;
478    }
479
480/*    for (size_t idx = 0; idx < nbIndex; ++idx)
481    {
482      size_t globalIndex = index(idx);
483      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
484    }*/
485   
486    CArray<size_t,1> globalIndexAxis(nbIndex);
487    for (size_t idx = 0; idx < nbIndex; ++idx)
488    {     
489      globalIndexAxis(idx) = (size_t)index(idx);
490    }
491
492    std::vector<size_t> globalAxisZoom(nZoomCount);
493    nZoomCount = 0;
494    for (size_t idx = 0; idx < zoom_size; ++idx)
495    {
496      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
497      if (globalZoomIndex >= minInd && globalZoomIndex <= maxInd)
498      {
499        globalAxisZoom[nZoomCount] = globalZoomIndex;
500        ++nZoomCount;
501      } 
502    }
503
504    std::set<int> writtenInd;
505    if (isCompressible_)
506    {
507      for (int idx = 0; idx < data_index.numElements(); ++idx)
508      {
509        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
510
511        if (ind >= 0 && ind < ni && mask(ind))
512        {
513          ind += ibegin;
514          if (ind >= global_zoom_begin && ind <= zoom_end)
515            writtenInd.insert(ind);
516        }
517      }
518    }
519
520    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
521    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
522    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
523    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
524    {
525      std::vector<int> nGlobAxis(1);
526      nGlobAxis[0] = n_glo.getValue();
527
528      size_t globalSizeIndex = 1, indexBegin, indexEnd;
529      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
530      indexBegin = 0;
531      if (globalSizeIndex <= clientSize)
532      {
533        indexBegin = rank%globalSizeIndex;
534        indexEnd = indexBegin;
535      }
536      else
537      {
538        for (int i = 0; i < clientSize; ++i)
539        {
540          range = globalSizeIndex / clientSize;
541          if (i < (globalSizeIndex%clientSize)) ++range;
542          if (i == client->clientRank) break;
543          indexBegin += range;
544        }
545        indexEnd = indexBegin + range - 1;
546      }
547
548      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
549      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
550      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
551      clientServerMap->computeServerIndexMapping(globalIndexAxis);
552      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
553      delete clientServerMap;
554    }
555    else
556    {
557      std::vector<size_t> globalIndexServer(n_glo.getValue());
558      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
559      {
560        globalIndexServer[idx] = idx;
561      }
562
563      for (int idx = 0; idx < nbServer; ++idx)
564      {
565        globalIndexAxisOnServer[idx] = globalIndexServer;
566      }
567    }
568
569    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
570                                                         ite = globalIndexAxisOnServer.end();
571    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
572                                        iteVec = (globalAxisZoom).end();
573    indSrv_.clear();
574    indWrittenSrv_.clear();
575    for (; it != ite; ++it)
576    {
577      int rank = it->first;
578      const std::vector<size_t>& globalIndexTmp = it->second;
579      int nb = globalIndexTmp.size();
580
581      for (int i = 0; i < nb; ++i)
582      {
583        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
584        {
585          indSrv_[rank].push_back(globalIndexTmp[i]);
586        }
587
588        if (writtenInd.count(globalIndexTmp[i]))
589        {
590          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
591        }
592      }
593    }
594
595    connectedServerRank_.clear();
596    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
597      connectedServerRank_.push_back(it->first);
598    }
599
600    if (!indSrv_.empty())
601    {
602      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
603                                                     iteIndSrv = indSrv_.end();
604      connectedServerRank_.clear();
605      for (; itIndSrv != iteIndSrv; ++itIndSrv)
606        connectedServerRank_.push_back(itIndSrv->first);
607    }
608    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
609  }
610
611  void CAxis::sendNonDistributedValue()
612  {
613    CContext* context = CContext::getCurrent();
614    CContextClient* client = context->client;
615    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
616
617    int zoom_end = global_zoom_begin + global_zoom_n - 1;
618    int nb = 0;
619/*    for (size_t idx = 0; idx < n; ++idx)
620    {
621      size_t globalIndex = begin + idx;
622      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
623    }*/
624
625    int end = (0 == n) ? begin : begin + n - 1;
626    int zoom_size = zoomByIndex() ? global_zoom_index.numElements() : global_zoom_n;
627    for (size_t idx = 0; idx < zoom_size; ++idx)
628    {
629      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
630      if (globalZoomIndex >= begin && globalZoomIndex <= end) ++nb;
631    }
632
633    int nbWritten = 0;
634    if (isCompressible_)
635    {
636      for (int idx = 0; idx < data_index.numElements(); ++idx)
637      {
638        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
639
640        if (ind >= 0 && ind < n && mask(ind))
641        {
642          ind += begin;
643          if (ind >= global_zoom_begin && ind <= zoom_end)
644            ++nbWritten;
645        }
646      }
647    }
648
649    CArray<double,1> val(nb);
650    nb = 0;
651/*    for (size_t idx = 0; idx < n; ++idx)
652    {
653      size_t globalIndex = begin + idx;
654      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
655      {
656        val(nb) = value(idx);
657        ++nb;
658      }
659    }*/
660
661    for (size_t idx = 0; idx < zoom_size; ++idx)
662    {
663      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
664      if (globalZoomIndex >= begin && globalZoomIndex <= end)
665      {
666        val(nb) = value(globalZoomIndex-begin);
667        ++nb;
668      }
669    }
670
671    CArray<int, 1> writtenInd(nbWritten);
672    nbWritten = 0;
673    if (isCompressible_)
674    {
675      for (int idx = 0; idx < data_index.numElements(); ++idx)
676      {
677        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
678
679        if (ind >= 0 && ind < n && mask(ind))
680        {
681          ind += begin;
682          if (ind >= global_zoom_begin && ind <= zoom_end)
683          {
684            writtenInd(nbWritten) = ind;
685            ++nbWritten;
686          }
687        }
688      }
689    }
690
691    if (client->isServerLeader())
692    {
693      std::list<CMessage> msgs;
694
695      const std::list<int>& ranks = client->getRanksServerLeader();
696      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
697      {
698        msgs.push_back(CMessage());
699        CMessage& msg = msgs.back();
700        msg << this->getId();
701        msg << val;
702        if (isCompressible_)
703          msg << writtenInd;
704        event.push(*itRank, 1, msg);
705      }
706      client->sendEvent(event);
707    }
708    else client->sendEvent(event);
709  }
710
711  void CAxis::sendDistributedValue(void)
712  {
713    int ns, n, i, j, ind, nv, idx;
714    CContext* context = CContext::getCurrent();
715    CContextClient* client=context->client;
716
717    // send value for each connected server
718    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
719    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
720
721    list<CMessage> list_msgsIndex, list_msgsVal;
722    list<CArray<int,1> > list_indi;
723    list<CArray<int,1> > list_writtenInd;
724    list<CArray<double,1> > list_val;
725    list<CArray<double,2> > list_bounds;
726    list<CArray<StdString,1> > list_label;
727
728    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
729    iteMap = indSrv_.end();
730    for (int k = 0; k < connectedServerRank_.size(); ++k)
731    {
732      int nbData = 0;
733      int rank = connectedServerRank_[k];
734      it = indSrv_.find(rank);
735      if (iteMap != it)
736        nbData = it->second.size();
737
738      list_indi.push_back(CArray<int,1>(nbData));
739      list_val.push_back(CArray<double,1>(nbData));
740
741      if (hasBounds_)
742      {
743        list_bounds.push_back(CArray<double,2>(2,nbData));
744      }
745     
746      if (hasLabel)
747      {
748        list_label.push_back(CArray<StdString,1>(nbData));
749      }
750
751      CArray<int,1>& indi = list_indi.back();
752      CArray<double,1>& val = list_val.back();
753
754      for (n = 0; n < nbData; ++n)
755      {
756        idx = static_cast<int>(it->second[n]);
757        ind = idx - begin;
758
759        val(n) = value(ind);
760        indi(n) = idx;
761
762        if (hasBounds_)
763        {
764          CArray<double,2>& boundsVal = list_bounds.back();
765          boundsVal(0, n) = bounds(0,n);
766          boundsVal(1, n) = bounds(1,n);
767        }
768       
769        if (hasLabel)
770        {
771          CArray<StdString,1>& labelVal = list_label.back();
772          labelVal(n) = label(n);
773        }
774      }
775
776      list_msgsIndex.push_back(CMessage());
777      list_msgsIndex.back() << this->getId() << list_indi.back();
778
779      if (isCompressible_)
780      {
781        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
782        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
783        CArray<int,1>& writtenInd = list_writtenInd.back();
784
785        for (n = 0; n < writtenInd.numElements(); ++n)
786          writtenInd(n) = writtenIndSrc[n];
787
788        list_msgsIndex.back() << writtenInd;
789      }
790
791      list_msgsVal.push_back(CMessage());
792      list_msgsVal.back() << this->getId() << list_val.back();
793
794      if (hasBounds_)
795      {
796        list_msgsVal.back() << list_bounds.back();
797      }
798 
799      if (hasLabel)
800      {
801        list_msgsVal.back() << list_label.back();
802      }
803
804      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
805      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
806    }
807
808    client->sendEvent(eventIndex);
809    client->sendEvent(eventVal);
810  }
811
812  void CAxis::recvIndex(CEventServer& event)
813  {
814    CAxis* axis;
815
816    list<CEventServer::SSubEvent>::iterator it;
817    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
818    {
819      CBufferIn* buffer = it->buffer;
820      string axisId;
821      *buffer >> axisId;
822      axis = get(axisId);
823      axis->recvIndex(it->rank, *buffer);
824    }
825
826    if (axis->isCompressible_)
827    {
828      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
829
830      CContextServer* server = CContext::getCurrent()->server;
831      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
832      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
833      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
834      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
835    }
836  }
837
838  void CAxis::recvIndex(int rank, CBufferIn& buffer)
839  {
840    buffer >> indiSrv_[rank];
841
842    if (isCompressible_)
843    {
844      CArray<int, 1> writtenIndexes;
845      buffer >> writtenIndexes;
846      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
847      for (int i = 0; i < writtenIndexes.numElements(); ++i)
848        indexesToWrite.push_back(writtenIndexes(i));
849    }
850  }
851
852  void CAxis::recvDistributedValue(CEventServer& event)
853  {
854    list<CEventServer::SSubEvent>::iterator it;
855    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
856    {
857      CBufferIn* buffer = it->buffer;
858      string axisId;
859      *buffer >> axisId;
860      get(axisId)->recvDistributedValue(it->rank, *buffer);
861    }
862  }
863
864  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
865  {
866    CArray<int,1> &indi = indiSrv_[rank];
867    CArray<double,1> val;
868    CArray<double,2> boundsVal;
869    CArray<StdString,1> labelVal;
870
871    buffer >> val;
872    if (hasBounds_) buffer >> boundsVal;
873    if (hasLabel) buffer >> labelVal;
874
875    int i, j, ind_srv;
876    for (int ind = 0; ind < indi.numElements(); ++ind)
877    {
878      i = indi(ind);
879      ind_srv = i - zoom_begin_srv;
880      value_srv(ind_srv) = val(ind);
881      if (hasBounds_)
882      {
883        bound_srv(0,ind_srv) = boundsVal(0, ind);
884        bound_srv(1,ind_srv) = boundsVal(1, ind);
885      }
886
887      if (hasLabel)
888      {
889        label_srv(ind_srv) = labelVal( ind);
890      }
891
892    }
893  }
894
895   void CAxis::recvNonDistributedValue(CEventServer& event)
896  {
897    CAxis* axis;
898
899    list<CEventServer::SSubEvent>::iterator it;
900    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
901    {
902      CBufferIn* buffer = it->buffer;
903      string axisId;
904      *buffer >> axisId;
905      axis = get(axisId);
906      axis->recvNonDistributedValue(it->rank, *buffer);
907    }
908
909    if (axis->isCompressible_)
910    {
911      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
912
913      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
914      axis->offsetWrittenIndexes_ = 0;
915    }
916  }
917
918  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
919  {
920    CArray<double,1> val;
921    buffer >> val;
922
923    for (int ind = 0; ind < val.numElements(); ++ind)
924    {
925      value_srv(ind) = val(ind);
926      if (hasBounds_)
927      {
928        bound_srv(0,ind) = bounds(0,ind);
929        bound_srv(1,ind) = bounds(1,ind);
930      }
931      if (hasLabel)
932      {
933        label_srv(ind) = label(ind);
934      }
935    }
936
937    if (isCompressible_)
938    {
939      CArray<int, 1> writtenIndexes;
940      buffer >> writtenIndexes;
941      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
942      for (int i = 0; i < writtenIndexes.numElements(); ++i)
943        indexesToWrite.push_back(writtenIndexes(i));
944    }
945  }
946
947  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
948                                 CServerDistributionDescription::ServerDistributionType distType)
949  {
950    CContext* context = CContext::getCurrent();
951    CContextClient* client = context->client;
952    int nbServer = client->serverSize;
953
954    CServerDistributionDescription serverDescription(globalDim, nbServer);
955    serverDescription.computeServerDistribution();
956
957    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
958    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
959
960    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
961    if (client->isServerLeader())
962    {
963      std::list<CMessage> msgs;
964
965      const std::list<int>& ranks = client->getRanksServerLeader();
966      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
967      {
968        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
969        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
970        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
971        const int end   = begin + ni - 1;
972        const bool zoomIndex = zoomByIndex();
973
974        msgs.push_back(CMessage());
975        CMessage& msg = msgs.back();
976        msg << this->getId();
977        msg << ni << begin << end;
978        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
979        msg << isCompressible_;       
980        msg << zoomIndex;
981        if (zoomIndex)
982          msg << global_zoom_index.getValue();
983
984        event.push(*itRank,1,msg);
985      }
986      client->sendEvent(event);
987    }
988    else client->sendEvent(event);
989  }
990
991  void CAxis::recvServerAttribut(CEventServer& event)
992  {
993    CBufferIn* buffer = event.subEvents.begin()->buffer;
994    string axisId;
995    *buffer >> axisId;
996    get(axisId)->recvServerAttribut(*buffer);
997  }
998
999  void CAxis::recvServerAttribut(CBufferIn& buffer)
1000  {
1001    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
1002    bool zoomIndex;   
1003    CArray<int,1> zoom_index_recv;
1004    std::vector<int> zoom_index_tmp;
1005    std::vector<int>::iterator itZoomBeginSrv, itZoomEndSrv, itZoomSrv;
1006
1007    buffer >> ni_srv >> begin_srv >> end_srv;
1008    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
1009    buffer >> isCompressible_;
1010    buffer >> zoomIndex;
1011    if (zoomIndex)
1012    {
1013      buffer >> zoom_index_recv;
1014      global_zoom_index.reference(zoom_index_recv);
1015      zoom_index_tmp.resize(global_zoom_index.numElements());
1016      std::copy(global_zoom_index.begin(), global_zoom_index.end(), zoom_index_tmp.begin());
1017      std::sort(zoom_index_tmp.begin(), zoom_index_tmp.end());
1018      itZoomBeginSrv = std::lower_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), begin_srv);
1019      itZoomEndSrv   = std::upper_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), end_srv);     
1020      int sz = std::distance(itZoomBeginSrv, itZoomEndSrv);
1021      zoom_index_srv.resize(sz);
1022      itZoomSrv = itZoomBeginSrv;
1023      for (int i = 0; i < sz; ++i, ++itZoomSrv)
1024      {
1025        zoom_index_srv(i) = *(itZoomSrv);
1026      }
1027    }
1028
1029    global_zoom_begin = global_zoom_begin_tmp;
1030    global_zoom_n  = global_zoom_n_tmp;
1031    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
1032
1033    zoom_begin_srv = zoomIndex ? std::distance(itZoomBeginSrv, zoom_index_tmp.begin())
1034                                 : global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
1035    zoom_end_srv   = zoomIndex ? std::distance(zoom_index_tmp.begin(), itZoomEndSrv) - 1 
1036                                 : global_zoom_end < end_srv ? global_zoom_end : end_srv ;
1037    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
1038     
1039    global_zoom_begin_srv = zoomIndex ? 0 : global_zoom_begin ;
1040    global_zoom_size_srv  = zoomIndex ? zoom_index_tmp.size() : global_zoom_n;
1041
1042    if (zoom_size_srv<=0)
1043    {
1044      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
1045    }
1046
1047    if (n_glo == n)
1048    {
1049      zoom_begin_srv = zoomIndex ? std::distance(itZoomBeginSrv, zoom_index_tmp.begin())
1050                                   : global_zoom_begin;     
1051      zoom_size_srv  = zoomIndex ? zoom_index_tmp.size()
1052                                   : global_zoom_n;
1053    }
1054    if (hasValue)
1055    {
1056      value_srv.resize(zoom_size_srv);
1057      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
1058      if (hasLabel)  label_srv.resize(zoom_size_srv);
1059    }
1060  }
1061
1062  /*!
1063    Compare two axis objects.
1064    They are equal if only if they have identical attributes as well as their values.
1065    Moreover, they must have the same transformations.
1066  \param [in] axis Compared axis
1067  \return result of the comparison
1068  */
1069  bool CAxis::isEqual(CAxis* obj)
1070  {
1071    vector<StdString> excludedAttr;
1072    excludedAttr.push_back("axis_ref");
1073
1074    bool objEqual = SuperClass::isEqual(obj, excludedAttr);   
1075    if (!objEqual) return objEqual;
1076
1077    TransMapTypes thisTrans = this->getAllTransformations();
1078    TransMapTypes objTrans  = obj->getAllTransformations();
1079
1080    TransMapTypes::const_iterator it, itb, ite;
1081    std::vector<ETranformationType> thisTransType, objTransType;
1082    for (it = thisTrans.begin(); it != thisTrans.end(); ++it)
1083      thisTransType.push_back(it->first);
1084    for (it = objTrans.begin(); it != objTrans.end(); ++it)
1085      objTransType.push_back(it->first);
1086
1087    if (thisTransType.size() != objTransType.size()) return false;
1088    for (int idx = 0; idx < thisTransType.size(); ++idx)
1089      objEqual &= (thisTransType[idx] == objTransType[idx]);
1090
1091    return objEqual;
1092  }
1093
1094  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
1095  {
1096    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
1097    return transformationMap_.back().second;
1098  }
1099
1100  bool CAxis::hasTransformation()
1101  {
1102    return (!transformationMap_.empty());
1103  }
1104
1105  void CAxis::setTransformations(const TransMapTypes& axisTrans)
1106  {
1107    transformationMap_ = axisTrans;
1108  }
1109
1110  CAxis::TransMapTypes CAxis::getAllTransformations(void)
1111  {
1112    return transformationMap_;
1113  }
1114
1115  void CAxis::duplicateTransformation(CAxis* src)
1116  {
1117    if (src->hasTransformation())
1118    {
1119      this->setTransformations(src->getAllTransformations());
1120    }
1121  }
1122
1123  /*!
1124   * Go through the hierarchy to find the axis from which the transformations must be inherited
1125   */
1126  void CAxis::solveInheritanceTransformation()
1127  {
1128    if (hasTransformation() || !hasDirectAxisReference())
1129      return;
1130
1131    CAxis* axis = this;
1132    std::vector<CAxis*> refAxis;
1133    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1134    {
1135      refAxis.push_back(axis);
1136      axis = axis->getDirectAxisReference();
1137    }
1138
1139    if (axis->hasTransformation())
1140      for (size_t i = 0; i < refAxis.size(); ++i)
1141        refAxis[i]->setTransformations(axis->getAllTransformations());
1142  }
1143
1144  void CAxis::parse(xml::CXMLNode & node)
1145  {
1146    SuperClass::parse(node);
1147
1148    if (node.goToChildElement())
1149    {
1150      StdString nodeElementName;
1151      do
1152      {
1153        StdString nodeId("");
1154        if (node.getAttributes().end() != node.getAttributes().find("id"))
1155        { nodeId = node.getAttributes()["id"]; }
1156
1157        nodeElementName = node.getElementName();
1158        //std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
1159        if(transformationMapList_ptr == 0) initializeTransformationMap();
1160        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_ptr->end(), it;
1161        //it = transformationMapList_.find(nodeElementName);
1162        it = transformationMapList_ptr->find(nodeElementName);
1163        if (ite != it)
1164        {
1165          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1166                                                                                                               nodeId,
1167                                                                                                               &node)));
1168        }
1169        else
1170        {
1171          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1172                << "The transformation " << nodeElementName << " has not been supported yet.");
1173        }
1174      } while (node.goToNextElement()) ;
1175      node.goToParentElement();
1176    }
1177  }
1178
1179  DEFINE_REF_FUNC(Axis,axis)
1180
1181   ///---------------------------------------------------------------
1182
1183} // namespace xios
Note: See TracBrowser for help on using the repository browser.