source: XIOS/dev/branch_yushan_merged/src/node/axis.cpp @ 1205

Last change on this file since 1205 was 1205, checked in by yushan, 7 years ago

branch merged with trunk @1200

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 38.4 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false), hasLabel(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false), hasLabel(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   std::map<StdString, ETranformationType> *CAxis::transformationMapList_ptr = 0; //new std::map<StdString, ETranformationType>(); 
47   //bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_ptr);
48
49   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
50   {
51     m["zoom_axis"] = TRANS_ZOOM_AXIS;
52     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
53     m["inverse_axis"] = TRANS_INVERSE_AXIS;
54     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
55     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
56   }
57
58
59   bool CAxis::initializeTransformationMap()
60   {
61     if(CAxis::transformationMapList_ptr == 0) CAxis::transformationMapList_ptr = new std::map<StdString, ETranformationType>();
62     (*CAxis::transformationMapList_ptr)["zoom_axis"]        = TRANS_ZOOM_AXIS;
63     (*CAxis::transformationMapList_ptr)["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
64     (*CAxis::transformationMapList_ptr)["inverse_axis"]     = TRANS_INVERSE_AXIS;
65     (*CAxis::transformationMapList_ptr)["reduce_domain"]    = TRANS_REDUCE_DOMAIN_TO_AXIS;
66     (*CAxis::transformationMapList_ptr)["extract_domain"]   = TRANS_EXTRACT_DOMAIN_TO_AXIS;
67   }
68
69
70   ///---------------------------------------------------------------
71
72   const std::set<StdString> & CAxis::getRelFiles(void) const
73   {
74      return (this->relFiles);
75   }
76
77   bool CAxis::IsWritten(const StdString & filename) const
78   {
79      return (this->relFiles.find(filename) != this->relFiles.end());
80   }
81
82   bool CAxis::isWrittenCompressed(const StdString& filename) const
83   {
84      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
85   }
86
87   bool CAxis::isDistributed(void) const
88   {
89      return isDistributed_;
90   }
91
92   /*!
93    * Test whether the data defined on the axis can be outputted in a compressed way.
94    *
95    * \return true if and only if a mask was defined for this axis
96    */
97   bool CAxis::isCompressible(void) const
98   {
99      return isCompressible_;
100   }
101
102   void CAxis::addRelFile(const StdString & filename)
103   {
104      this->relFiles.insert(filename);
105   }
106
107   void CAxis::addRelFileCompressed(const StdString& filename)
108   {
109      this->relFilesCompressed.insert(filename);
110   }
111
112   //----------------------------------------------------------------
113
114   const std::vector<int>& CAxis::getIndexesToWrite(void) const
115   {
116     return indexesToWrite;
117   }
118
119   /*!
120     Returns the number of indexes written by each server.
121     \return the number of indexes written by each server
122   */
123   int CAxis::getNumberWrittenIndexes() const
124   {
125     return numberWrittenIndexes_;
126   }
127
128   /*!
129     Returns the total number of indexes written by the servers.
130     \return the total number of indexes written by the servers
131   */
132   int CAxis::getTotalNumberWrittenIndexes() const
133   {
134     return totalNumberWrittenIndexes_;
135   }
136
137   /*!
138     Returns the offset of indexes written by each server.
139     \return the offset of indexes written by each server
140   */
141   int CAxis::getOffsetWrittenIndexes() const
142   {
143     return offsetWrittenIndexes_;
144   }
145
146   //----------------------------------------------------------------
147
148   /*!
149    * Compute the minimum buffer size required to send the attributes to the server(s).
150    *
151    * \return A map associating the server rank with its minimum buffer size.
152    */
153   std::map<int, StdSize> CAxis::getAttributesBufferSize()
154   {
155     CContextClient* client = CContext::getCurrent()->client;
156
157     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
158
159     bool isNonDistributed = (n == n_glo);
160
161     if (client->isServerLeader())
162     {
163       // size estimation for sendServerAttribut
164       size_t size = 6 * sizeof(size_t);
165       // size estimation for sendNonDistributedValue
166       if (isNonDistributed)
167         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
168       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
169
170       const std::list<int>& ranks = client->getRanksServerLeader();
171       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
172       {
173         if (size > attributesSizes[*itRank])
174           attributesSizes[*itRank] = size;
175       }
176     }
177
178     if (!isNonDistributed)
179     {
180       // size estimation for sendDistributedValue
181       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
182       for (it = indSrv_.begin(); it != ite; ++it)
183       {
184         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
185         if (isCompressible_)
186           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
187
188         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
189         if (hasBounds_)
190           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
191 
192         if (hasLabel)
193           sizeValEvent += CArray<StdString,1>::size(it->second.size());
194
195         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
196         if (size > attributesSizes[it->first])
197           attributesSizes[it->first] = size;
198       }
199     }
200
201     return attributesSizes;
202   }
203
204   //----------------------------------------------------------------
205
206   StdString CAxis::GetName(void)   { return (StdString("axis")); }
207   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
208   ENodeType CAxis::GetType(void)   { return (eAxis); }
209
210   //----------------------------------------------------------------
211
212   CAxis* CAxis::createAxis()
213   {
214     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
215     return axis;
216   }
217
218   void CAxis::fillInValues(const CArray<double,1>& values)
219   {
220     this->value = values;
221   }
222
223   void CAxis::checkAttributes(void)
224   {
225      if (this->n_glo.isEmpty())
226        ERROR("CAxis::checkAttributes(void)",
227              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
228              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
229      StdSize size = this->n_glo.getValue();
230
231      if (!this->index.isEmpty())
232      {
233        if (n.isEmpty()) n = index.numElements();
234
235        // It's not so correct but if begin is not the first value of index
236        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
237        if (begin.isEmpty()) begin = index(0);         
238      }
239      else 
240      {
241        if (!this->begin.isEmpty())
242        {
243          if (begin < 0 || begin > size - 1)
244            ERROR("CAxis::checkAttributes(void)",
245                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
246                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
247        }
248        else this->begin.setValue(0);
249
250        if (!this->n.isEmpty())
251        {
252          if (n < 0 || n > size)
253            ERROR("CAxis::checkAttributes(void)",
254                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
255                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
256        }
257        else this->n.setValue(size);
258
259        {
260          index.resize(n);
261          for (int i = 0; i < n; ++i) index(i) = i+begin;
262        }
263      }
264
265      if (!this->value.isEmpty())
266      {
267        StdSize true_size = value.numElements();
268        if (this->n.getValue() != true_size)
269          ERROR("CAxis::checkAttributes(void)",
270                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
271                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
272        this->hasValue = true;
273      }
274
275      this->checkData();
276      this->checkZoom();
277      this->checkMask();
278      this->checkBounds();
279      this->checkLabel();
280
281      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
282                       (!this->n.isEmpty() && (this->n != this->n_glo));
283
284      // A same stupid condition to make sure that if there is only one client, axis
285      // should be considered to be distributed. This should be a temporary solution     
286      isDistributed_ |= (1 == CContext::getCurrent()->client->clientSize);
287   }
288
289   void CAxis::checkData()
290   {
291      if (data_begin.isEmpty()) data_begin.setValue(0);
292
293      if (data_n.isEmpty())
294      {
295        data_n.setValue(n);
296      }
297      else if (data_n.getValue() < 0)
298      {
299        ERROR("CAxis::checkData(void)",
300              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
301              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
302      }
303
304      if (data_index.isEmpty())
305      {
306        data_index.resize(data_n);
307        for (int i = 0; i < data_n; ++i) data_index(i) = i;
308      }
309   }
310
311   void CAxis::checkZoom(void)
312   {
313     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
314     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
315   }
316
317   void CAxis::checkMask()
318   {
319      if (!mask.isEmpty())
320      {
321         if (mask.extent(0) != n)
322           ERROR("CAxis::checkMask(void)",
323                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
324                 << "The mask does not have the same size as the local domain." << std::endl
325                 << "Local size is " << n.getValue() << "." << std::endl
326                 << "Mask size is " << mask.extent(0) << ".");
327      }
328      else // (mask.isEmpty())
329      { // If no mask was defined, we create a default one without any masked point.
330         mask.resize(n);
331         for (int i = 0; i < n; ++i)
332         {
333           mask(i) = true;
334         }
335      }
336   }
337
338  void CAxis::checkBounds()
339  {
340    if (!bounds.isEmpty())
341    {
342      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
343        ERROR("CAxis::checkAttributes(void)",
344              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
345              << "Axis size is " << n.getValue() << "." << std::endl
346              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
347      hasBounds_ = true;
348    }
349    else hasBounds_ = false;
350  }
351
352  void CAxis::checkLabel()
353  {
354    if (!label.isEmpty())
355    {
356      if (label.extent(0) != n)
357        ERROR("CAxis::checkLabel(void)",
358              << "The label array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension of axis size." << std::endl
359              << "Axis size is " << n.getValue() << "." << std::endl
360              << "label size is "<< label.extent(0)<<  " .");
361      hasLabel = true;
362    }
363    else hasLabel = false;
364  }
365  void CAxis::checkEligibilityForCompressedOutput()
366  {
367    // We don't check if the mask is valid here, just if a mask has been defined at this point.
368    isCompressible_ = !mask.isEmpty();
369  }
370
371  bool CAxis::zoomByIndex()
372  {
373    return (!global_zoom_index.isEmpty() && (0 != global_zoom_index.numElements()));
374  }
375
376  bool CAxis::dispatchEvent(CEventServer& event)
377   {
378      if (SuperClass::dispatchEvent(event)) return true;
379      else
380      {
381        switch(event.type)
382        {
383           case EVENT_ID_SERVER_ATTRIBUT :
384             recvServerAttribut(event);
385             return true;
386             break;
387           case EVENT_ID_INDEX:
388            recvIndex(event);
389            return true;
390            break;
391          case EVENT_ID_DISTRIBUTED_VALUE:
392            recvDistributedValue(event);
393            return true;
394            break;
395          case EVENT_ID_NON_DISTRIBUTED_VALUE:
396            recvNonDistributedValue(event);
397            return true;
398            break;
399           default :
400             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
401                    << "Unknown Event");
402           return false;
403         }
404      }
405   }
406
407   void CAxis::checkAttributesOnClient()
408   {
409     if (this->areClientAttributesChecked_) return;
410
411     this->checkAttributes();
412
413     this->areClientAttributesChecked_ = true;
414   }
415
416   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
417                                                          CServerDistributionDescription::ServerDistributionType distType)
418   {
419     CContext* context=CContext::getCurrent() ;
420
421     if (this->isClientAfterTransformationChecked) return;
422     if (context->hasClient)
423     {
424       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
425     }
426
427     this->isClientAfterTransformationChecked = true;
428   }
429
430   // Send all checked attributes to server
431   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
432                                     CServerDistributionDescription::ServerDistributionType distType)
433   {
434     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
435     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
436     CContext* context = CContext::getCurrent();
437
438     if (this->isChecked) return;
439     if (context->hasClient)
440     {
441       sendServerAttribut(globalDim, orderPositionInGrid, distType);
442       if (hasValue) sendValue();
443     }
444
445     this->isChecked = true;
446   }
447
448  void CAxis::sendValue()
449  {
450     if (n.getValue() == n_glo.getValue())
451       sendNonDistributedValue();
452     else
453       sendDistributedValue();
454  }
455
456  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
457                                     CServerDistributionDescription::ServerDistributionType distType)
458  {
459    CContext* context = CContext::getCurrent();
460    CContextClient* client = context->client;
461    int nbServer = client->serverSize;
462    int range, clientSize = client->clientSize;
463    int rank = client->clientRank;
464
465    size_t ni = this->n.getValue();
466    size_t ibegin = this->begin.getValue();
467    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
468    size_t nZoomCount = 0;
469    size_t nbIndex = index.numElements();
470
471    int end = (0 == n) ? begin : begin + n - 1;
472    int zoom_size = zoomByIndex() ? global_zoom_index.numElements() : global_zoom_n;
473    int minInd = min(index);
474    int maxInd = max(index);
475    for (size_t idx = 0; idx < zoom_size; ++idx)
476    {
477      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
478      if (globalZoomIndex >= minInd && globalZoomIndex <= maxInd) ++nZoomCount;
479    }
480
481/*    for (size_t idx = 0; idx < nbIndex; ++idx)
482    {
483      size_t globalIndex = index(idx);
484      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
485    }*/
486   
487    CArray<size_t,1> globalIndexAxis(nbIndex);
488    for (size_t idx = 0; idx < nbIndex; ++idx)
489    {     
490      globalIndexAxis(idx) = (size_t)index(idx);
491    }
492
493    std::vector<size_t> globalAxisZoom(nZoomCount);
494    nZoomCount = 0;
495    for (size_t idx = 0; idx < zoom_size; ++idx)
496    {
497      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
498      if (globalZoomIndex >= minInd && globalZoomIndex <= maxInd)
499      {
500        globalAxisZoom[nZoomCount] = globalZoomIndex;
501        ++nZoomCount;
502      }
503    }
504
505    std::set<int> writtenInd;
506    if (isCompressible_)
507    {
508      for (int idx = 0; idx < data_index.numElements(); ++idx)
509      {
510        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
511
512        if (ind >= 0 && ind < ni && mask(ind))
513        {
514          ind += ibegin;
515          if (ind >= global_zoom_begin && ind <= zoom_end)
516            writtenInd.insert(ind);
517        }
518      }
519    }
520
521    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
522    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
523    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
524    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
525    {
526      std::vector<int> nGlobAxis(1);
527      nGlobAxis[0] = n_glo.getValue();
528
529      size_t globalSizeIndex = 1, indexBegin, indexEnd;
530      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
531      indexBegin = 0;
532      if (globalSizeIndex <= clientSize)
533      {
534        indexBegin = rank%globalSizeIndex;
535        indexEnd = indexBegin;
536      }
537      else
538      {
539        for (int i = 0; i < clientSize; ++i)
540        {
541          range = globalSizeIndex / clientSize;
542          if (i < (globalSizeIndex%clientSize)) ++range;
543          if (i == client->clientRank) break;
544          indexBegin += range;
545        }
546        indexEnd = indexBegin + range - 1;
547      }
548
549      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
550      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
551      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
552      clientServerMap->computeServerIndexMapping(globalIndexAxis);
553      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
554      delete clientServerMap;
555    }
556    else
557    {
558      std::vector<size_t> globalIndexServer(n_glo.getValue());
559      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
560      {
561        globalIndexServer[idx] = idx;
562      }
563
564      for (int idx = 0; idx < nbServer; ++idx)
565      {
566        globalIndexAxisOnServer[idx] = globalIndexServer;
567      }
568    }
569
570    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
571                                                         ite = globalIndexAxisOnServer.end();
572    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
573                                        iteVec = (globalAxisZoom).end();
574    indSrv_.clear();
575    indWrittenSrv_.clear();
576    for (; it != ite; ++it)
577    {
578      int rank = it->first;
579      const std::vector<size_t>& globalIndexTmp = it->second;
580      int nb = globalIndexTmp.size();
581
582      for (int i = 0; i < nb; ++i)
583      {
584        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
585        {
586          indSrv_[rank].push_back(globalIndexTmp[i]);
587        }
588
589        if (writtenInd.count(globalIndexTmp[i]))
590        {
591          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
592        }
593      }
594    }
595
596    connectedServerRank_.clear();
597    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
598      connectedServerRank_.push_back(it->first);
599    }
600
601    if (!indSrv_.empty())
602    {
603      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
604                                                     iteIndSrv = indSrv_.end();
605      connectedServerRank_.clear();
606      for (; itIndSrv != iteIndSrv; ++itIndSrv)
607        connectedServerRank_.push_back(itIndSrv->first);
608    }
609    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
610  }
611
612  void CAxis::sendNonDistributedValue()
613  {
614    CContext* context = CContext::getCurrent();
615    CContextClient* client = context->client;
616    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
617
618    int zoom_end = global_zoom_begin + global_zoom_n - 1;
619    int nb = 0;
620/*    for (size_t idx = 0; idx < n; ++idx)
621    {
622      size_t globalIndex = begin + idx;
623      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
624    }*/
625
626    int end = (0 == n) ? begin : begin + n - 1;
627    int zoom_size = zoomByIndex() ? global_zoom_index.numElements() : global_zoom_n;
628    for (size_t idx = 0; idx < zoom_size; ++idx)
629    {
630      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
631      if (globalZoomIndex >= begin && globalZoomIndex <= end) ++nb;
632    }
633
634    int nbWritten = 0;
635    if (isCompressible_)
636    {
637      for (int idx = 0; idx < data_index.numElements(); ++idx)
638      {
639        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
640
641        if (ind >= 0 && ind < n && mask(ind))
642        {
643          ind += begin;
644          if (ind >= global_zoom_begin && ind <= zoom_end)
645            ++nbWritten;
646        }
647      }
648    }
649
650    CArray<double,1> val(nb);
651    nb = 0;
652/*    for (size_t idx = 0; idx < n; ++idx)
653    {
654      size_t globalIndex = begin + idx;
655      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
656      {
657        val(nb) = value(idx);
658        ++nb;
659      }
660    }*/
661
662    for (size_t idx = 0; idx < zoom_size; ++idx)
663    {
664      size_t globalZoomIndex = zoomByIndex() ? global_zoom_index(idx) : global_zoom_begin + idx;
665      if (globalZoomIndex >= begin && globalZoomIndex <= end)
666      {
667        val(nb) = value(globalZoomIndex-begin);
668        ++nb;
669      }
670    }
671
672    CArray<int, 1> writtenInd(nbWritten);
673    nbWritten = 0;
674    if (isCompressible_)
675    {
676      for (int idx = 0; idx < data_index.numElements(); ++idx)
677      {
678        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
679
680        if (ind >= 0 && ind < n && mask(ind))
681        {
682          ind += begin;
683          if (ind >= global_zoom_begin && ind <= zoom_end)
684          {
685            writtenInd(nbWritten) = ind;
686            ++nbWritten;
687          }
688        }
689      }
690    }
691
692    if (client->isServerLeader())
693    {
694      std::list<CMessage> msgs;
695
696      const std::list<int>& ranks = client->getRanksServerLeader();
697      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
698      {
699        msgs.push_back(CMessage());
700        CMessage& msg = msgs.back();
701        msg << this->getId();
702        msg << val;
703        if (isCompressible_)
704          msg << writtenInd;
705        event.push(*itRank, 1, msg);
706      }
707      client->sendEvent(event);
708    }
709    else client->sendEvent(event);
710  }
711
712  void CAxis::sendDistributedValue(void)
713  {
714    int ns, n, i, j, ind, nv, idx;
715    CContext* context = CContext::getCurrent();
716    CContextClient* client=context->client;
717
718    // send value for each connected server
719    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
720    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
721
722    list<CMessage> list_msgsIndex, list_msgsVal;
723    list<CArray<int,1> > list_indi;
724    list<CArray<int,1> > list_writtenInd;
725    list<CArray<double,1> > list_val;
726    list<CArray<double,2> > list_bounds;
727    list<CArray<StdString,1> > list_label;
728
729    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
730    iteMap = indSrv_.end();
731    for (int k = 0; k < connectedServerRank_.size(); ++k)
732    {
733      int nbData = 0;
734      int rank = connectedServerRank_[k];
735      it = indSrv_.find(rank);
736      if (iteMap != it)
737        nbData = it->second.size();
738
739      list_indi.push_back(CArray<int,1>(nbData));
740      list_val.push_back(CArray<double,1>(nbData));
741
742      if (hasBounds_)
743      {
744        list_bounds.push_back(CArray<double,2>(2,nbData));
745      }
746     
747      if (hasLabel)
748      {
749        list_label.push_back(CArray<StdString,1>(nbData));
750      }
751
752      CArray<int,1>& indi = list_indi.back();
753      CArray<double,1>& val = list_val.back();
754
755      for (n = 0; n < nbData; ++n)
756      {
757        idx = static_cast<int>(it->second[n]);
758        ind = idx - begin;
759
760        val(n) = value(ind);
761        indi(n) = idx;
762
763        if (hasBounds_)
764        {
765          CArray<double,2>& boundsVal = list_bounds.back();
766          boundsVal(0, n) = bounds(0,n);
767          boundsVal(1, n) = bounds(1,n);
768        }
769       
770        if (hasLabel)
771        {
772          CArray<StdString,1>& labelVal = list_label.back();
773          labelVal(n) = label(n);
774        }
775      }
776
777      list_msgsIndex.push_back(CMessage());
778      list_msgsIndex.back() << this->getId() << list_indi.back();
779
780      if (isCompressible_)
781      {
782        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
783        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
784        CArray<int,1>& writtenInd = list_writtenInd.back();
785
786        for (n = 0; n < writtenInd.numElements(); ++n)
787          writtenInd(n) = writtenIndSrc[n];
788
789        list_msgsIndex.back() << writtenInd;
790      }
791
792      list_msgsVal.push_back(CMessage());
793      list_msgsVal.back() << this->getId() << list_val.back();
794
795      if (hasBounds_)
796      {
797        list_msgsVal.back() << list_bounds.back();
798      }
799 
800      if (hasLabel)
801      {
802        list_msgsVal.back() << list_label.back();
803      }
804
805      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
806      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
807    }
808
809    client->sendEvent(eventIndex);
810    client->sendEvent(eventVal);
811  }
812
813  void CAxis::recvIndex(CEventServer& event)
814  {
815    CAxis* axis;
816
817    list<CEventServer::SSubEvent>::iterator it;
818    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
819    {
820      CBufferIn* buffer = it->buffer;
821      string axisId;
822      *buffer >> axisId;
823      axis = get(axisId);
824      axis->recvIndex(it->rank, *buffer);
825    }
826
827    if (axis->isCompressible_)
828    {
829      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
830
831      CContextServer* server = CContext::getCurrent()->server;
832      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
833      ep_lib::MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
834      ep_lib::MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
835      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
836    }
837  }
838
839  void CAxis::recvIndex(int rank, CBufferIn& buffer)
840  {
841    buffer >> indiSrv_[rank];
842
843    if (isCompressible_)
844    {
845      CArray<int, 1> writtenIndexes;
846      buffer >> writtenIndexes;
847      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
848      for (int i = 0; i < writtenIndexes.numElements(); ++i)
849        indexesToWrite.push_back(writtenIndexes(i));
850    }
851  }
852
853  void CAxis::recvDistributedValue(CEventServer& event)
854  {
855    list<CEventServer::SSubEvent>::iterator it;
856    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
857    {
858      CBufferIn* buffer = it->buffer;
859      string axisId;
860      *buffer >> axisId;
861      get(axisId)->recvDistributedValue(it->rank, *buffer);
862    }
863  }
864
865  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
866  {
867    CArray<int,1> &indi = indiSrv_[rank];
868    CArray<double,1> val;
869    CArray<double,2> boundsVal;
870    CArray<StdString,1> labelVal;
871
872    buffer >> val;
873    if (hasBounds_) buffer >> boundsVal;
874    if (hasLabel) buffer >> labelVal;
875
876    int i, j, ind_srv;
877    for (int ind = 0; ind < indi.numElements(); ++ind)
878    {
879      i = indi(ind);
880      ind_srv = i - zoom_begin_srv;
881      value_srv(ind_srv) = val(ind);
882      if (hasBounds_)
883      {
884        bound_srv(0,ind_srv) = boundsVal(0, ind);
885        bound_srv(1,ind_srv) = boundsVal(1, ind);
886      }
887
888      if (hasLabel)
889      {
890        label_srv(ind_srv) = labelVal( ind);
891      }
892
893    }
894  }
895
896   void CAxis::recvNonDistributedValue(CEventServer& event)
897  {
898    CAxis* axis;
899
900    list<CEventServer::SSubEvent>::iterator it;
901    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
902    {
903      CBufferIn* buffer = it->buffer;
904      string axisId;
905      *buffer >> axisId;
906      axis = get(axisId);
907      axis->recvNonDistributedValue(it->rank, *buffer);
908    }
909
910    if (axis->isCompressible_)
911    {
912      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
913
914      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
915      axis->offsetWrittenIndexes_ = 0;
916    }
917  }
918
919  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
920  {
921    CArray<double,1> val;
922    buffer >> val;
923
924    for (int ind = 0; ind < val.numElements(); ++ind)
925    {
926      value_srv(ind) = val(ind);
927      if (hasBounds_)
928      {
929        bound_srv(0,ind) = bounds(0,ind);
930        bound_srv(1,ind) = bounds(1,ind);
931      }
932      if (hasLabel)
933      {
934        label_srv(ind) = label(ind);
935      }
936    }
937
938    if (isCompressible_)
939    {
940      CArray<int, 1> writtenIndexes;
941      buffer >> writtenIndexes;
942      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
943      for (int i = 0; i < writtenIndexes.numElements(); ++i)
944        indexesToWrite.push_back(writtenIndexes(i));
945    }
946  }
947
948  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
949                                 CServerDistributionDescription::ServerDistributionType distType)
950  {
951    CContext* context = CContext::getCurrent();
952    CContextClient* client = context->client;
953    int nbServer = client->serverSize;
954
955    CServerDistributionDescription serverDescription(globalDim, nbServer);
956    serverDescription.computeServerDistribution();
957
958    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
959    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
960
961    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
962    if (client->isServerLeader())
963    {
964      std::list<CMessage> msgs;
965
966      const std::list<int>& ranks = client->getRanksServerLeader();
967      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
968      {
969        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
970        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
971        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
972        const int end   = begin + ni - 1;
973        const bool zoomIndex = zoomByIndex();
974
975        msgs.push_back(CMessage());
976        CMessage& msg = msgs.back();
977        msg << this->getId();
978        msg << ni << begin << end;
979        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
980        msg << isCompressible_;
981        msg << zoomIndex;
982        if (zoomIndex)
983          msg << global_zoom_index.getValue();
984
985        event.push(*itRank,1,msg);
986      }
987      client->sendEvent(event);
988    }
989    else client->sendEvent(event);
990  }
991
992  void CAxis::recvServerAttribut(CEventServer& event)
993  {
994    CBufferIn* buffer = event.subEvents.begin()->buffer;
995    string axisId;
996    *buffer >> axisId;
997    get(axisId)->recvServerAttribut(*buffer);
998  }
999
1000  void CAxis::recvServerAttribut(CBufferIn& buffer)
1001  {
1002    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
1003    bool zoomIndex;   
1004    CArray<int,1> zoom_index_recv;
1005    std::vector<int> zoom_index_tmp;
1006    std::vector<int>::iterator itZoomBeginSrv, itZoomEndSrv, itZoomSrv;
1007
1008    buffer >> ni_srv >> begin_srv >> end_srv;
1009    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
1010    buffer >> isCompressible_;
1011    buffer >> zoomIndex;
1012    if (zoomIndex)
1013    {
1014      buffer >> zoom_index_recv;
1015      global_zoom_index.reference(zoom_index_recv);
1016      zoom_index_tmp.resize(global_zoom_index.numElements());
1017      std::copy(global_zoom_index.begin(), global_zoom_index.end(), zoom_index_tmp.begin());
1018      std::sort(zoom_index_tmp.begin(), zoom_index_tmp.end());
1019      itZoomBeginSrv = std::lower_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), begin_srv);
1020      itZoomEndSrv   = std::upper_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), end_srv);     
1021      int sz = std::distance(itZoomBeginSrv, itZoomEndSrv);
1022      zoom_index_srv.resize(sz);
1023      itZoomSrv = itZoomBeginSrv;
1024      for (int i = 0; i < sz; ++i, ++itZoomSrv)
1025      {
1026        zoom_index_srv(i) = *(itZoomSrv);
1027      }
1028    }
1029
1030    global_zoom_begin = global_zoom_begin_tmp;
1031    global_zoom_n  = global_zoom_n_tmp;
1032    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
1033
1034    zoom_begin_srv = zoomIndex ? std::distance(itZoomBeginSrv, zoom_index_tmp.begin())
1035                                 : global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
1036    zoom_end_srv   = zoomIndex ? std::distance(zoom_index_tmp.begin(), itZoomEndSrv) - 1 
1037                                 : global_zoom_end < end_srv ? global_zoom_end : end_srv ;
1038    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
1039     
1040    global_zoom_begin_srv = zoomIndex ? 0 : global_zoom_begin ;
1041    global_zoom_size_srv  = zoomIndex ? zoom_index_tmp.size() : global_zoom_n;
1042
1043    if (zoom_size_srv<=0)
1044    {
1045      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
1046    }
1047
1048    if (n_glo == n)
1049    {
1050      zoom_begin_srv = zoomIndex ? std::distance(itZoomBeginSrv, zoom_index_tmp.begin())
1051                                   : global_zoom_begin;     
1052      zoom_size_srv  = zoomIndex ? zoom_index_tmp.size()
1053                                   : global_zoom_n;
1054    }
1055    if (hasValue)
1056    {
1057      value_srv.resize(zoom_size_srv);
1058      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
1059      if (hasLabel)  label_srv.resize(zoom_size_srv);
1060    }
1061  }
1062
1063  /*!
1064    Compare two axis objects.
1065    They are equal if only if they have identical attributes as well as their values.
1066    Moreover, they must have the same transformations.
1067  \param [in] axis Compared axis
1068  \return result of the comparison
1069  */
1070  bool CAxis::isEqual(CAxis* obj)
1071  {
1072    vector<StdString> excludedAttr;
1073    excludedAttr.push_back("axis_ref");
1074
1075    bool objEqual = SuperClass::isEqual(obj, excludedAttr);   
1076    if (!objEqual) return objEqual;
1077
1078    TransMapTypes thisTrans = this->getAllTransformations();
1079    TransMapTypes objTrans  = obj->getAllTransformations();
1080
1081    TransMapTypes::const_iterator it, itb, ite;
1082    std::vector<ETranformationType> thisTransType, objTransType;
1083    for (it = thisTrans.begin(); it != thisTrans.end(); ++it)
1084      thisTransType.push_back(it->first);
1085    for (it = objTrans.begin(); it != objTrans.end(); ++it)
1086      objTransType.push_back(it->first);
1087
1088    if (thisTransType.size() != objTransType.size()) return false;
1089    for (int idx = 0; idx < thisTransType.size(); ++idx)
1090      objEqual &= (thisTransType[idx] == objTransType[idx]);
1091
1092    return objEqual;
1093  }
1094
1095  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
1096  {
1097    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
1098    return transformationMap_.back().second;
1099  }
1100
1101  bool CAxis::hasTransformation()
1102  {
1103    return (!transformationMap_.empty());
1104  }
1105
1106  void CAxis::setTransformations(const TransMapTypes& axisTrans)
1107  {
1108    transformationMap_ = axisTrans;
1109  }
1110
1111  CAxis::TransMapTypes CAxis::getAllTransformations(void)
1112  {
1113    return transformationMap_;
1114  }
1115
1116  void CAxis::duplicateTransformation(CAxis* src)
1117  {
1118    if (src->hasTransformation())
1119    {
1120      this->setTransformations(src->getAllTransformations());
1121    }
1122  }
1123
1124  /*!
1125   * Go through the hierarchy to find the axis from which the transformations must be inherited
1126   */
1127  void CAxis::solveInheritanceTransformation()
1128  {
1129    if (hasTransformation() || !hasDirectAxisReference())
1130      return;
1131
1132    CAxis* axis = this;
1133    std::vector<CAxis*> refAxis;
1134    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1135    {
1136      refAxis.push_back(axis);
1137      axis = axis->getDirectAxisReference();
1138    }
1139
1140    if (axis->hasTransformation())
1141      for (size_t i = 0; i < refAxis.size(); ++i)
1142        refAxis[i]->setTransformations(axis->getAllTransformations());
1143  }
1144
1145  void CAxis::parse(xml::CXMLNode & node)
1146  {
1147    SuperClass::parse(node);
1148
1149    if (node.goToChildElement())
1150    {
1151      StdString nodeElementName;
1152      do
1153      {
1154        StdString nodeId("");
1155        if (node.getAttributes().end() != node.getAttributes().find("id"))
1156        { nodeId = node.getAttributes()["id"]; }
1157
1158        nodeElementName = node.getElementName();
1159
1160        if(transformationMapList_ptr == 0) initializeTransformationMap();
1161        //transformationMapList_ptr = new std::map<StdString, ETranformationType>();
1162
1163        std::map<StdString, ETranformationType>::const_iterator ite = (*CAxis::transformationMapList_ptr).end(), it;
1164        it = (*CAxis::transformationMapList_ptr).find(nodeElementName);
1165        if (ite != it)
1166        {
1167          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1168                                                                                                               nodeId,
1169                                                                                                               &node)));
1170        }
1171        else
1172        {
1173          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1174                << "The transformation " << nodeElementName << " has not been supported yet.");
1175        }
1176      } while (node.goToNextElement()) ;
1177      node.goToParentElement();
1178    }
1179  }
1180
1181  DEFINE_REF_FUNC(Axis,axis)
1182
1183   ///---------------------------------------------------------------
1184
1185} // namespace xios
1186
Note: See TracBrowser for help on using the repository browser.