source: XIOS/dev/branch_yushan_merged/src/node/axis.cpp @ 1134

Last change on this file since 1134 was 1134, checked in by yushan, 4 years ago

branch merged with trunk r1130

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 35.5 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false), hasLabel(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false), hasLabel(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   std::map<StdString, ETranformationType> *CAxis::transformationMapList_ptr = 0; //new std::map<StdString, ETranformationType>(); 
47   //bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_ptr);
48
49   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
50   {
51     m["zoom_axis"] = TRANS_ZOOM_AXIS;
52     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
53     m["inverse_axis"] = TRANS_INVERSE_AXIS;
54     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
55     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
56   }
57
58
59   bool CAxis::initializeTransformationMap()
60   {
61     if(CAxis::transformationMapList_ptr == 0) CAxis::transformationMapList_ptr = new std::map<StdString, ETranformationType>();
62     (*CAxis::transformationMapList_ptr)["zoom_axis"]        = TRANS_ZOOM_AXIS;
63     (*CAxis::transformationMapList_ptr)["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
64     (*CAxis::transformationMapList_ptr)["inverse_axis"]     = TRANS_INVERSE_AXIS;
65     (*CAxis::transformationMapList_ptr)["reduce_domain"]    = TRANS_REDUCE_DOMAIN_TO_AXIS;
66     (*CAxis::transformationMapList_ptr)["extract_domain"]   = TRANS_EXTRACT_DOMAIN_TO_AXIS;
67   }
68
69
70   ///---------------------------------------------------------------
71
72   const std::set<StdString> & CAxis::getRelFiles(void) const
73   {
74      return (this->relFiles);
75   }
76
77   bool CAxis::IsWritten(const StdString & filename) const
78   {
79      return (this->relFiles.find(filename) != this->relFiles.end());
80   }
81
82   bool CAxis::isWrittenCompressed(const StdString& filename) const
83   {
84      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
85   }
86
87   bool CAxis::isDistributed(void) const
88   {
89      return isDistributed_;
90   }
91
92   /*!
93    * Test whether the data defined on the axis can be outputted in a compressed way.
94    *
95    * \return true if and only if a mask was defined for this axis
96    */
97   bool CAxis::isCompressible(void) const
98   {
99      return isCompressible_;
100   }
101
102   void CAxis::addRelFile(const StdString & filename)
103   {
104      this->relFiles.insert(filename);
105   }
106
107   void CAxis::addRelFileCompressed(const StdString& filename)
108   {
109      this->relFilesCompressed.insert(filename);
110   }
111
112   //----------------------------------------------------------------
113
114   const std::vector<int>& CAxis::getIndexesToWrite(void) const
115   {
116     return indexesToWrite;
117   }
118
119   /*!
120     Returns the number of indexes written by each server.
121     \return the number of indexes written by each server
122   */
123   int CAxis::getNumberWrittenIndexes() const
124   {
125     return numberWrittenIndexes_;
126   }
127
128   /*!
129     Returns the total number of indexes written by the servers.
130     \return the total number of indexes written by the servers
131   */
132   int CAxis::getTotalNumberWrittenIndexes() const
133   {
134     return totalNumberWrittenIndexes_;
135   }
136
137   /*!
138     Returns the offset of indexes written by each server.
139     \return the offset of indexes written by each server
140   */
141   int CAxis::getOffsetWrittenIndexes() const
142   {
143     return offsetWrittenIndexes_;
144   }
145
146   //----------------------------------------------------------------
147
148   /*!
149    * Compute the minimum buffer size required to send the attributes to the server(s).
150    *
151    * \return A map associating the server rank with its minimum buffer size.
152    */
153   std::map<int, StdSize> CAxis::getAttributesBufferSize()
154   {
155     CContextClient* client = CContext::getCurrent()->client;
156
157     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
158
159     bool isNonDistributed = (n == n_glo);
160
161     if (client->isServerLeader())
162     {
163       // size estimation for sendServerAttribut
164       size_t size = 6 * sizeof(size_t);
165       // size estimation for sendNonDistributedValue
166       if (isNonDistributed)
167         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
168       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
169
170       const std::list<int>& ranks = client->getRanksServerLeader();
171       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
172       {
173         if (size > attributesSizes[*itRank])
174           attributesSizes[*itRank] = size;
175       }
176     }
177
178     if (!isNonDistributed)
179     {
180       // size estimation for sendDistributedValue
181       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
182       for (it = indSrv_.begin(); it != ite; ++it)
183       {
184         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
185         if (isCompressible_)
186           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
187
188         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
189         if (hasBounds_)
190           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
191 
192         if (hasLabel)
193           sizeValEvent += CArray<StdString,1>::size(it->second.size());
194
195         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
196         if (size > attributesSizes[it->first])
197           attributesSizes[it->first] = size;
198       }
199     }
200
201     return attributesSizes;
202   }
203
204   //----------------------------------------------------------------
205
206   StdString CAxis::GetName(void)   { return (StdString("axis")); }
207   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
208   ENodeType CAxis::GetType(void)   { return (eAxis); }
209
210   //----------------------------------------------------------------
211
212   CAxis* CAxis::createAxis()
213   {
214     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
215     return axis;
216   }
217
218   void CAxis::fillInValues(const CArray<double,1>& values)
219   {
220     this->value = values;
221   }
222
223   void CAxis::checkAttributes(void)
224   {
225      if (this->n_glo.isEmpty())
226        ERROR("CAxis::checkAttributes(void)",
227              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
228              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
229      StdSize size = this->n_glo.getValue();
230
231      if (!this->index.isEmpty())
232      {
233        if (n.isEmpty()) n = index.numElements();
234
235        // It's not so correct but if begin is not the first value of index
236        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
237        if (begin.isEmpty()) begin = index(0);         
238      }
239      else 
240      {
241        if (!this->begin.isEmpty())
242        {
243          if (begin < 0 || begin > size - 1)
244            ERROR("CAxis::checkAttributes(void)",
245                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
246                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
247        }
248        else this->begin.setValue(0);
249
250        if (!this->n.isEmpty())
251        {
252          if (n < 0 || n > size)
253            ERROR("CAxis::checkAttributes(void)",
254                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
255                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
256        }
257        else this->n.setValue(size);
258
259        {
260          index.resize(n);
261          for (int i = 0; i < n; ++i) index(i) = i+begin;
262        }
263      }
264
265      if (!this->value.isEmpty())
266      {
267        StdSize true_size = value.numElements();
268        if (this->n.getValue() != true_size)
269          ERROR("CAxis::checkAttributes(void)",
270                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
271                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
272        this->hasValue = true;
273      }
274
275      this->checkData();
276      this->checkZoom();
277      this->checkMask();
278      this->checkBounds();
279      this->checkLabel();
280
281      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
282                       (!this->n.isEmpty() && (this->n != this->n_glo));
283
284      // A same stupid condition to make sure that if there is only one client, axis
285      // should be considered to be distributed. This should be a temporary solution     
286      isDistributed_ |= (1 == CContext::getCurrent()->client->clientSize);
287   }
288
289   void CAxis::checkData()
290   {
291      if (data_begin.isEmpty()) data_begin.setValue(0);
292
293      if (data_n.isEmpty())
294      {
295        data_n.setValue(n);
296      }
297      else if (data_n.getValue() < 0)
298      {
299        ERROR("CAxis::checkData(void)",
300              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
301              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
302      }
303
304      if (data_index.isEmpty())
305      {
306        data_index.resize(data_n);
307        for (int i = 0; i < data_n; ++i) data_index(i) = i;
308      }
309   }
310
311   void CAxis::checkZoom(void)
312   {
313     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
314     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
315   }
316
317   void CAxis::checkMask()
318   {
319      if (!mask.isEmpty())
320      {
321         if (mask.extent(0) != n)
322           ERROR("CAxis::checkMask(void)",
323                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
324                 << "The mask does not have the same size as the local domain." << std::endl
325                 << "Local size is " << n.getValue() << "." << std::endl
326                 << "Mask size is " << mask.extent(0) << ".");
327      }
328      else // (mask.isEmpty())
329      { // If no mask was defined, we create a default one without any masked point.
330         mask.resize(n);
331         for (int i = 0; i < n; ++i)
332         {
333           mask(i) = true;
334         }
335      }
336   }
337
338  void CAxis::checkBounds()
339  {
340    if (!bounds.isEmpty())
341    {
342      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
343        ERROR("CAxis::checkAttributes(void)",
344              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
345              << "Axis size is " << n.getValue() << "." << std::endl
346              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
347      hasBounds_ = true;
348    }
349    else hasBounds_ = false;
350  }
351
352  void CAxis::checkLabel()
353  {
354    if (!label.isEmpty())
355    {
356      if (label.extent(0) != n)
357        ERROR("CAxis::checkLabel(void)",
358              << "The label array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension of axis size." << std::endl
359              << "Axis size is " << n.getValue() << "." << std::endl
360              << "label size is "<< label.extent(0)<<  " .");
361      hasLabel = true;
362    }
363    else hasLabel = false;
364  }
365  void CAxis::checkEligibilityForCompressedOutput()
366  {
367    // We don't check if the mask is valid here, just if a mask has been defined at this point.
368    isCompressible_ = !mask.isEmpty();
369  }
370
371  bool CAxis::dispatchEvent(CEventServer& event)
372   {
373      if (SuperClass::dispatchEvent(event)) return true;
374      else
375      {
376        switch(event.type)
377        {
378           case EVENT_ID_SERVER_ATTRIBUT :
379             recvServerAttribut(event);
380             return true;
381             break;
382           case EVENT_ID_INDEX:
383            recvIndex(event);
384            return true;
385            break;
386          case EVENT_ID_DISTRIBUTED_VALUE:
387            recvDistributedValue(event);
388            return true;
389            break;
390          case EVENT_ID_NON_DISTRIBUTED_VALUE:
391            recvNonDistributedValue(event);
392            return true;
393            break;
394           default :
395             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
396                    << "Unknown Event");
397           return false;
398         }
399      }
400   }
401
402   void CAxis::checkAttributesOnClient()
403   {
404     if (this->areClientAttributesChecked_) return;
405
406     this->checkAttributes();
407
408     this->areClientAttributesChecked_ = true;
409   }
410
411   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
412                                                          CServerDistributionDescription::ServerDistributionType distType)
413   {
414     CContext* context=CContext::getCurrent() ;
415
416     if (this->isClientAfterTransformationChecked) return;
417     if (context->hasClient)
418     {
419       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
420     }
421
422     this->isClientAfterTransformationChecked = true;
423   }
424
425   // Send all checked attributes to server
426   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
427                                     CServerDistributionDescription::ServerDistributionType distType)
428   {
429     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
430     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
431     CContext* context = CContext::getCurrent();
432
433     if (this->isChecked) return;
434     if (context->hasClient)
435     {
436       sendServerAttribut(globalDim, orderPositionInGrid, distType);
437       if (hasValue) sendValue();
438     }
439
440     this->isChecked = true;
441   }
442
443  void CAxis::sendValue()
444  {
445     if (n.getValue() == n_glo.getValue())
446       sendNonDistributedValue();
447     else
448       sendDistributedValue();
449  }
450
451  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
452                                     CServerDistributionDescription::ServerDistributionType distType)
453  {
454    CContext* context = CContext::getCurrent();
455    CContextClient* client = context->client;
456    int nbServer = client->serverSize;
457    int range, clientSize = client->clientSize;
458    int rank = client->clientRank;
459
460    size_t ni = this->n.getValue();
461    size_t ibegin = this->begin.getValue();
462    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
463    size_t nZoomCount = 0;
464    size_t nbIndex = index.numElements();
465    for (size_t idx = 0; idx < nbIndex; ++idx)
466    {
467      size_t globalIndex = index(idx);
468      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
469    }
470
471    CArray<size_t,1> globalIndexAxis(nbIndex);
472    std::vector<size_t> globalAxisZoom(nZoomCount);
473    nZoomCount = 0;
474    for (size_t idx = 0; idx < nbIndex; ++idx)
475    {
476      size_t globalIndex = index(idx);
477      globalIndexAxis(idx) = globalIndex;
478      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
479      {
480        globalAxisZoom[nZoomCount] = globalIndex;
481        ++nZoomCount;
482      }
483    }
484
485    std::set<int> writtenInd;
486    if (isCompressible_)
487    {
488      for (int idx = 0; idx < data_index.numElements(); ++idx)
489      {
490        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
491
492        if (ind >= 0 && ind < ni && mask(ind))
493        {
494          ind += ibegin;
495          if (ind >= global_zoom_begin && ind <= zoom_end)
496            writtenInd.insert(ind);
497        }
498      }
499    }
500
501    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
502    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
503    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
504    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
505    {
506      std::vector<int> nGlobAxis(1);
507      nGlobAxis[0] = n_glo.getValue();
508
509      size_t globalSizeIndex = 1, indexBegin, indexEnd;
510      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
511      indexBegin = 0;
512      if (globalSizeIndex <= clientSize)
513      {
514        indexBegin = rank%globalSizeIndex;
515        indexEnd = indexBegin;
516      }
517      else
518      {
519        for (int i = 0; i < clientSize; ++i)
520        {
521          range = globalSizeIndex / clientSize;
522          if (i < (globalSizeIndex%clientSize)) ++range;
523          if (i == client->clientRank) break;
524          indexBegin += range;
525        }
526        indexEnd = indexBegin + range - 1;
527      }
528
529      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
530      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
531      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
532      clientServerMap->computeServerIndexMapping(globalIndexAxis);
533      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
534      delete clientServerMap;
535    }
536    else
537    {
538      std::vector<size_t> globalIndexServer(n_glo.getValue());
539      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
540      {
541        globalIndexServer[idx] = idx;
542      }
543
544      for (int idx = 0; idx < nbServer; ++idx)
545      {
546        globalIndexAxisOnServer[idx] = globalIndexServer;
547      }
548    }
549
550    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
551                                                         ite = globalIndexAxisOnServer.end();
552    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
553                                        iteVec = (globalAxisZoom).end();
554    indSrv_.clear();
555    indWrittenSrv_.clear();
556    for (; it != ite; ++it)
557    {
558      int rank = it->first;
559      const std::vector<size_t>& globalIndexTmp = it->second;
560      int nb = globalIndexTmp.size();
561
562      for (int i = 0; i < nb; ++i)
563      {
564        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
565        {
566          indSrv_[rank].push_back(globalIndexTmp[i]);
567        }
568
569        if (writtenInd.count(globalIndexTmp[i]))
570        {
571          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
572        }
573      }
574    }
575
576    connectedServerRank_.clear();
577    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
578      connectedServerRank_.push_back(it->first);
579    }
580
581    if (!indSrv_.empty())
582    {
583      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
584                                                     iteIndSrv = indSrv_.end();
585      connectedServerRank_.clear();
586      for (; itIndSrv != iteIndSrv; ++itIndSrv)
587        connectedServerRank_.push_back(itIndSrv->first);
588    }
589    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
590  }
591
592  void CAxis::sendNonDistributedValue()
593  {
594    CContext* context = CContext::getCurrent();
595    CContextClient* client = context->client;
596    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
597
598    int zoom_end = global_zoom_begin + global_zoom_n - 1;
599    int nb = 0;
600    for (size_t idx = 0; idx < n; ++idx)
601    {
602      size_t globalIndex = begin + idx;
603      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
604    }
605
606    int nbWritten = 0;
607    if (isCompressible_)
608    {
609      for (int idx = 0; idx < data_index.numElements(); ++idx)
610      {
611        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
612
613        if (ind >= 0 && ind < n && mask(ind))
614        {
615          ind += begin;
616          if (ind >= global_zoom_begin && ind <= zoom_end)
617            ++nbWritten;
618        }
619      }
620    }
621
622    CArray<double,1> val(nb);
623    nb = 0;
624    for (size_t idx = 0; idx < n; ++idx)
625    {
626      size_t globalIndex = begin + idx;
627      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
628      {
629        val(nb) = value(idx);
630        ++nb;
631      }
632    }
633
634    CArray<int, 1> writtenInd(nbWritten);
635    nbWritten = 0;
636    if (isCompressible_)
637    {
638      for (int idx = 0; idx < data_index.numElements(); ++idx)
639      {
640        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
641
642        if (ind >= 0 && ind < n && mask(ind))
643        {
644          ind += begin;
645          if (ind >= global_zoom_begin && ind <= zoom_end)
646          {
647            writtenInd(nbWritten) = ind;
648            ++nbWritten;
649          }
650        }
651      }
652    }
653
654    if (client->isServerLeader())
655    {
656      std::list<CMessage> msgs;
657
658      const std::list<int>& ranks = client->getRanksServerLeader();
659      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
660      {
661        msgs.push_back(CMessage());
662        CMessage& msg = msgs.back();
663        msg << this->getId();
664        msg << val;
665        if (isCompressible_)
666          msg << writtenInd;
667        event.push(*itRank, 1, msg);
668      }
669      client->sendEvent(event);
670    }
671    else client->sendEvent(event);
672  }
673
674  void CAxis::sendDistributedValue(void)
675  {
676    int ns, n, i, j, ind, nv, idx;
677    CContext* context = CContext::getCurrent();
678    CContextClient* client=context->client;
679
680    // send value for each connected server
681    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
682    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
683
684    list<CMessage> list_msgsIndex, list_msgsVal;
685    list<CArray<int,1> > list_indi;
686    list<CArray<int,1> > list_writtenInd;
687    list<CArray<double,1> > list_val;
688    list<CArray<double,2> > list_bounds;
689    list<CArray<StdString,1> > list_label;
690
691    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
692    iteMap = indSrv_.end();
693    for (int k = 0; k < connectedServerRank_.size(); ++k)
694    {
695      int nbData = 0;
696      int rank = connectedServerRank_[k];
697      it = indSrv_.find(rank);
698      if (iteMap != it)
699        nbData = it->second.size();
700
701      list_indi.push_back(CArray<int,1>(nbData));
702      list_val.push_back(CArray<double,1>(nbData));
703
704      if (hasBounds_)
705      {
706        list_bounds.push_back(CArray<double,2>(2,nbData));
707      }
708     
709      if (hasLabel)
710      {
711        list_label.push_back(CArray<StdString,1>(nbData));
712      }
713
714      CArray<int,1>& indi = list_indi.back();
715      CArray<double,1>& val = list_val.back();
716
717      for (n = 0; n < nbData; ++n)
718      {
719        idx = static_cast<int>(it->second[n]);
720        ind = idx - begin;
721
722        val(n) = value(ind);
723        indi(n) = idx;
724
725        if (hasBounds_)
726        {
727          CArray<double,2>& boundsVal = list_bounds.back();
728          boundsVal(0, n) = bounds(0,n);
729          boundsVal(1, n) = bounds(1,n);
730        }
731       
732        if (hasLabel)
733        {
734          CArray<StdString,1>& labelVal = list_label.back();
735          labelVal(n) = label(n);
736        }
737      }
738
739      list_msgsIndex.push_back(CMessage());
740      list_msgsIndex.back() << this->getId() << list_indi.back();
741
742      if (isCompressible_)
743      {
744        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
745        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
746        CArray<int,1>& writtenInd = list_writtenInd.back();
747
748        for (n = 0; n < writtenInd.numElements(); ++n)
749          writtenInd(n) = writtenIndSrc[n];
750
751        list_msgsIndex.back() << writtenInd;
752      }
753
754      list_msgsVal.push_back(CMessage());
755      list_msgsVal.back() << this->getId() << list_val.back();
756
757      if (hasBounds_)
758      {
759        list_msgsVal.back() << list_bounds.back();
760      }
761 
762      if (hasLabel)
763      {
764        list_msgsVal.back() << list_label.back();
765      }
766
767      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
768      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
769    }
770
771    client->sendEvent(eventIndex);
772    client->sendEvent(eventVal);
773  }
774
775  void CAxis::recvIndex(CEventServer& event)
776  {
777    CAxis* axis;
778
779    list<CEventServer::SSubEvent>::iterator it;
780    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
781    {
782      CBufferIn* buffer = it->buffer;
783      string axisId;
784      *buffer >> axisId;
785      axis = get(axisId);
786      axis->recvIndex(it->rank, *buffer);
787    }
788
789    if (axis->isCompressible_)
790    {
791      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
792
793      CContextServer* server = CContext::getCurrent()->server;
794      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
795      ep_lib::MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
796      ep_lib::MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
797      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
798    }
799  }
800
801  void CAxis::recvIndex(int rank, CBufferIn& buffer)
802  {
803    buffer >> indiSrv_[rank];
804
805    if (isCompressible_)
806    {
807      CArray<int, 1> writtenIndexes;
808      buffer >> writtenIndexes;
809      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
810      for (int i = 0; i < writtenIndexes.numElements(); ++i)
811        indexesToWrite.push_back(writtenIndexes(i));
812    }
813  }
814
815  void CAxis::recvDistributedValue(CEventServer& event)
816  {
817    list<CEventServer::SSubEvent>::iterator it;
818    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
819    {
820      CBufferIn* buffer = it->buffer;
821      string axisId;
822      *buffer >> axisId;
823      get(axisId)->recvDistributedValue(it->rank, *buffer);
824    }
825  }
826
827  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
828  {
829    CArray<int,1> &indi = indiSrv_[rank];
830    CArray<double,1> val;
831    CArray<double,2> boundsVal;
832    CArray<StdString,1> labelVal;
833
834    buffer >> val;
835    if (hasBounds_) buffer >> boundsVal;
836    if (hasLabel) buffer >> labelVal;
837
838    int i, j, ind_srv;
839    for (int ind = 0; ind < indi.numElements(); ++ind)
840    {
841      i = indi(ind);
842      ind_srv = i - zoom_begin_srv;
843      value_srv(ind_srv) = val(ind);
844      if (hasBounds_)
845      {
846        bound_srv(0,ind_srv) = boundsVal(0, ind);
847        bound_srv(1,ind_srv) = boundsVal(1, ind);
848      }
849
850      if (hasLabel)
851      {
852        label_srv(ind_srv) = labelVal( ind);
853      }
854
855    }
856  }
857
858   void CAxis::recvNonDistributedValue(CEventServer& event)
859  {
860    CAxis* axis;
861
862    list<CEventServer::SSubEvent>::iterator it;
863    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
864    {
865      CBufferIn* buffer = it->buffer;
866      string axisId;
867      *buffer >> axisId;
868      axis = get(axisId);
869      axis->recvNonDistributedValue(it->rank, *buffer);
870    }
871
872    if (axis->isCompressible_)
873    {
874      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
875
876      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
877      axis->offsetWrittenIndexes_ = 0;
878    }
879  }
880
881  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
882  {
883    CArray<double,1> val;
884    buffer >> val;
885
886    for (int ind = 0; ind < val.numElements(); ++ind)
887    {
888      value_srv(ind) = val(ind);
889      if (hasBounds_)
890      {
891        bound_srv(0,ind) = bounds(0,ind);
892        bound_srv(1,ind) = bounds(1,ind);
893      }
894      if (hasLabel)
895      {
896        label_srv(ind) = label(ind);
897      }
898    }
899
900    if (isCompressible_)
901    {
902      CArray<int, 1> writtenIndexes;
903      buffer >> writtenIndexes;
904      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
905      for (int i = 0; i < writtenIndexes.numElements(); ++i)
906        indexesToWrite.push_back(writtenIndexes(i));
907    }
908  }
909
910  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
911                                 CServerDistributionDescription::ServerDistributionType distType)
912  {
913    CContext* context = CContext::getCurrent();
914    CContextClient* client = context->client;
915    int nbServer = client->serverSize;
916
917    CServerDistributionDescription serverDescription(globalDim, nbServer);
918    serverDescription.computeServerDistribution();
919
920    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
921    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
922
923    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
924    if (client->isServerLeader())
925    {
926      std::list<CMessage> msgs;
927
928      const std::list<int>& ranks = client->getRanksServerLeader();
929      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
930      {
931        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
932        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
933        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
934        const int end   = begin + ni - 1;
935
936        msgs.push_back(CMessage());
937        CMessage& msg = msgs.back();
938        msg << this->getId();
939        msg << ni << begin << end;
940        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
941        msg << isCompressible_;
942
943        event.push(*itRank,1,msg);
944      }
945      client->sendEvent(event);
946    }
947    else client->sendEvent(event);
948  }
949
950  void CAxis::recvServerAttribut(CEventServer& event)
951  {
952    CBufferIn* buffer = event.subEvents.begin()->buffer;
953    string axisId;
954    *buffer >> axisId;
955    get(axisId)->recvServerAttribut(*buffer);
956  }
957
958  void CAxis::recvServerAttribut(CBufferIn& buffer)
959  {
960    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
961
962    buffer >> ni_srv >> begin_srv >> end_srv;
963    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
964    buffer >> isCompressible_;
965    global_zoom_begin = global_zoom_begin_tmp;
966    global_zoom_n  = global_zoom_n_tmp;
967    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
968
969    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
970    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
971    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
972
973    if (zoom_size_srv<=0)
974    {
975      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
976    }
977
978    if (n_glo == n)
979    {
980      zoom_begin_srv = global_zoom_begin;
981      zoom_end_srv   = global_zoom_end; //zoom_end;
982      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
983    }
984    if (hasValue)
985    {
986      value_srv.resize(zoom_size_srv);
987      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
988      if (hasLabel)  label_srv.resize(zoom_size_srv);
989    }
990  }
991
992  /*!
993    Compare two axis objects.
994    They are equal if only if they have identical attributes as well as their values.
995    Moreover, they must have the same transformations.
996  \param [in] axis Compared axis
997  \return result of the comparison
998  */
999  bool CAxis::isEqual(CAxis* obj)
1000  {
1001    vector<StdString> excludedAttr;
1002    excludedAttr.push_back("axis_ref");
1003
1004    bool objEqual = SuperClass::isEqual(obj, excludedAttr);   
1005    if (!objEqual) return objEqual;
1006
1007    TransMapTypes thisTrans = this->getAllTransformations();
1008    TransMapTypes objTrans  = obj->getAllTransformations();
1009
1010    TransMapTypes::const_iterator it, itb, ite;
1011    std::vector<ETranformationType> thisTransType, objTransType;
1012    for (it = thisTrans.begin(); it != thisTrans.end(); ++it)
1013      thisTransType.push_back(it->first);
1014    for (it = objTrans.begin(); it != objTrans.end(); ++it)
1015      objTransType.push_back(it->first);
1016
1017    if (thisTransType.size() != objTransType.size()) return false;
1018    for (int idx = 0; idx < thisTransType.size(); ++idx)
1019      objEqual &= (thisTransType[idx] == objTransType[idx]);
1020
1021    return objEqual;
1022  }
1023
1024  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
1025  {
1026    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
1027    return transformationMap_.back().second;
1028  }
1029
1030  bool CAxis::hasTransformation()
1031  {
1032    return (!transformationMap_.empty());
1033  }
1034
1035  void CAxis::setTransformations(const TransMapTypes& axisTrans)
1036  {
1037    transformationMap_ = axisTrans;
1038  }
1039
1040  CAxis::TransMapTypes CAxis::getAllTransformations(void)
1041  {
1042    return transformationMap_;
1043  }
1044
1045
1046
1047  void CAxis::duplicateTransformation(CAxis* src)
1048  {
1049    if (src->hasTransformation())
1050    {
1051      this->setTransformations(src->getAllTransformations());
1052    }
1053  }
1054
1055  /*!
1056   * Go through the hierarchy to find the axis from which the transformations must be inherited
1057   */
1058  void CAxis::solveInheritanceTransformation()
1059  {
1060    if (hasTransformation() || !hasDirectAxisReference())
1061      return;
1062
1063    CAxis* axis = this;
1064    std::vector<CAxis*> refAxis;
1065    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1066    {
1067      refAxis.push_back(axis);
1068      axis = axis->getDirectAxisReference();
1069    }
1070
1071    if (axis->hasTransformation())
1072      for (size_t i = 0; i < refAxis.size(); ++i)
1073        refAxis[i]->setTransformations(axis->getAllTransformations());
1074  }
1075
1076  void CAxis::parse(xml::CXMLNode & node)
1077  {
1078    SuperClass::parse(node);
1079
1080    if (node.goToChildElement())
1081    {
1082      StdString nodeElementName;
1083      do
1084      {
1085        StdString nodeId("");
1086        if (node.getAttributes().end() != node.getAttributes().find("id"))
1087        { nodeId = node.getAttributes()["id"]; }
1088
1089        nodeElementName = node.getElementName();
1090
1091        if(transformationMapList_ptr == 0) initializeTransformationMap();
1092        //transformationMapList_ptr = new std::map<StdString, ETranformationType>();
1093
1094        std::map<StdString, ETranformationType>::const_iterator ite = (*CAxis::transformationMapList_ptr).end(), it;
1095        it = (*CAxis::transformationMapList_ptr).find(nodeElementName);
1096        if (ite != it)
1097        {
1098          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1099                                                                                                               nodeId,
1100                                                                                                               &node)));
1101        }
1102        else
1103        {
1104          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1105                << "The transformation " << nodeElementName << " has not been supported yet.");
1106        }
1107      } while (node.goToNextElement()) ;
1108      node.goToParentElement();
1109    }
1110  }
1111
1112  DEFINE_REF_FUNC(Axis,axis)
1113
1114   ///---------------------------------------------------------------
1115
1116} // namespace xios
1117
Note: See TracBrowser for help on using the repository browser.