source: XIOS/trunk/src/node/axis.cpp @ 964

Last change on this file since 964 was 964, checked in by mhnguyen, 6 years ago

Ticket 108: Fixing distribution condition of an axis

+) Make sure an axis having n == n_glo be non-distributed

Test
+) OK

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 31.9 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
47   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
48   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
49   {
50     m["zoom_axis"] = TRANS_ZOOM_AXIS;
51     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
52     m["inverse_axis"] = TRANS_INVERSE_AXIS;
53     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
54     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
55   }
56
57   ///---------------------------------------------------------------
58
59   const std::set<StdString> & CAxis::getRelFiles(void) const
60   {
61      return (this->relFiles);
62   }
63
64   bool CAxis::IsWritten(const StdString & filename) const
65   {
66      return (this->relFiles.find(filename) != this->relFiles.end());
67   }
68
69   bool CAxis::isWrittenCompressed(const StdString& filename) const
70   {
71      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
72   }
73
74   bool CAxis::isDistributed(void) const
75   {
76      return isDistributed_;
77   }
78
79   /*!
80    * Test whether the data defined on the axis can be outputted in a compressed way.
81    *
82    * \return true if and only if a mask was defined for this axis
83    */
84   bool CAxis::isCompressible(void) const
85   {
86      return isCompressible_;
87   }
88
89   void CAxis::addRelFile(const StdString & filename)
90   {
91      this->relFiles.insert(filename);
92   }
93
94   void CAxis::addRelFileCompressed(const StdString& filename)
95   {
96      this->relFilesCompressed.insert(filename);
97   }
98
99   //----------------------------------------------------------------
100
101   const std::vector<int>& CAxis::getIndexesToWrite(void) const
102   {
103     return indexesToWrite;
104   }
105
106   /*!
107     Returns the number of indexes written by each server.
108     \return the number of indexes written by each server
109   */
110   int CAxis::getNumberWrittenIndexes() const
111   {
112     return numberWrittenIndexes_;
113   }
114
115   /*!
116     Returns the total number of indexes written by the servers.
117     \return the total number of indexes written by the servers
118   */
119   int CAxis::getTotalNumberWrittenIndexes() const
120   {
121     return totalNumberWrittenIndexes_;
122   }
123
124   /*!
125     Returns the offset of indexes written by each server.
126     \return the offset of indexes written by each server
127   */
128   int CAxis::getOffsetWrittenIndexes() const
129   {
130     return offsetWrittenIndexes_;
131   }
132
133   //----------------------------------------------------------------
134
135   /*!
136    * Compute the minimum buffer size required to send the attributes to the server(s).
137    *
138    * \return A map associating the server rank with its minimum buffer size.
139    */
140   std::map<int, StdSize> CAxis::getAttributesBufferSize()
141   {
142     CContextClient* client = CContext::getCurrent()->client;
143
144     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
145
146     bool isNonDistributed = (n == n_glo);
147
148     if (client->isServerLeader())
149     {
150       // size estimation for sendServerAttribut
151       size_t size = 6 * sizeof(size_t);
152       // size estimation for sendNonDistributedValue
153       if (isNonDistributed)
154         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
155       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
156
157       const std::list<int>& ranks = client->getRanksServerLeader();
158       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
159       {
160         if (size > attributesSizes[*itRank])
161           attributesSizes[*itRank] = size;
162       }
163     }
164
165     if (!isNonDistributed)
166     {
167       // size estimation for sendDistributedValue
168       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
169       for (it = indSrv_.begin(); it != ite; ++it)
170       {
171         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
172         if (isCompressible_)
173           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
174
175         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
176         if (hasBounds_)
177           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
178
179         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
180         if (size > attributesSizes[it->first])
181           attributesSizes[it->first] = size;
182       }
183     }
184
185     return attributesSizes;
186   }
187
188   //----------------------------------------------------------------
189
190   StdString CAxis::GetName(void)   { return (StdString("axis")); }
191   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
192   ENodeType CAxis::GetType(void)   { return (eAxis); }
193
194   //----------------------------------------------------------------
195
196   CAxis* CAxis::createAxis()
197   {
198     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
199     return axis;
200   }
201
202   void CAxis::fillInValues(const CArray<double,1>& values)
203   {
204     this->value = values;
205   }
206
207   void CAxis::checkAttributes(void)
208   {
209      if (this->n_glo.isEmpty())
210        ERROR("CAxis::checkAttributes(void)",
211              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
212              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
213      StdSize size = this->n_glo.getValue();
214
215      if (!this->begin.isEmpty())
216      {
217        if (begin < 0 || begin > size - 1)
218          ERROR("CAxis::checkAttributes(void)",
219                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
220                << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
221      }
222      else this->begin.setValue(0);
223
224      if (!this->n.isEmpty())
225      {
226        if (n < 0 || n > size)
227          ERROR("CAxis::checkAttributes(void)",
228                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
229                << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
230      }
231      else this->n.setValue(size);
232
233      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
234                       (!this->n.isEmpty() && (this->n != this->n_glo));
235
236      if (!this->value.isEmpty())
237      {
238        StdSize true_size = value.numElements();
239        if (this->n.getValue() != true_size)
240          ERROR("CAxis::checkAttributes(void)",
241                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
242                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
243        this->hasValue = true;
244      }
245
246      if (this->index.isEmpty())
247      {
248        index.resize(n);
249        for (int i = 0; i < n; ++i) index(i) = i+begin;
250      }
251
252      this->checkData();
253      this->checkZoom();
254      this->checkMask();
255      this->checkBounds();
256   }
257
258   void CAxis::checkData()
259   {
260      if (data_begin.isEmpty()) data_begin.setValue(0);
261
262      if (data_n.isEmpty())
263      {
264        data_n.setValue(n);
265      }
266      else if (data_n.getValue() < 0)
267      {
268        ERROR("CAxis::checkData(void)",
269              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
270              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
271      }
272
273      if (data_index.isEmpty())
274      {
275        data_index.resize(data_n);
276        for (int i = 0; i < data_n; ++i) data_index(i) = i;
277      }
278   }
279
280   void CAxis::checkZoom(void)
281   {
282     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
283     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
284   }
285
286   void CAxis::checkMask()
287   {
288      if (!mask.isEmpty())
289      {
290         if (mask.extent(0) != n)
291           ERROR("CAxis::checkMask(void)",
292                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
293                 << "The mask does not have the same size as the local domain." << std::endl
294                 << "Local size is " << n.getValue() << "." << std::endl
295                 << "Mask size is " << mask.extent(0) << ".");
296      }
297      else // (mask.isEmpty())
298      { // If no mask was defined, we create a default one without any masked point.
299         mask.resize(n);
300         for (int i = 0; i < n; ++i)
301         {
302           mask(i) = true;
303         }
304      }
305   }
306
307  void CAxis::checkBounds()
308  {
309    if (!bounds.isEmpty())
310    {
311      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
312        ERROR("CAxis::checkAttributes(void)",
313              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
314              << "Axis size is " << n.getValue() << "." << std::endl
315              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
316      hasBounds_ = true;
317    }
318    else hasBounds_ = false;
319  }
320
321  void CAxis::checkEligibilityForCompressedOutput()
322  {
323    // We don't check if the mask is valid here, just if a mask has been defined at this point.
324    isCompressible_ = !mask.isEmpty();
325  }
326
327  bool CAxis::dispatchEvent(CEventServer& event)
328   {
329      if (SuperClass::dispatchEvent(event)) return true;
330      else
331      {
332        switch(event.type)
333        {
334           case EVENT_ID_SERVER_ATTRIBUT :
335             recvServerAttribut(event);
336             return true;
337             break;
338           case EVENT_ID_INDEX:
339            recvIndex(event);
340            return true;
341            break;
342          case EVENT_ID_DISTRIBUTED_VALUE:
343            recvDistributedValue(event);
344            return true;
345            break;
346          case EVENT_ID_NON_DISTRIBUTED_VALUE:
347            recvNonDistributedValue(event);
348            return true;
349            break;
350           default :
351             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
352                    << "Unknown Event");
353           return false;
354         }
355      }
356   }
357
358   void CAxis::checkAttributesOnClient()
359   {
360     if (this->areClientAttributesChecked_) return;
361
362     this->checkAttributes();
363
364     this->areClientAttributesChecked_ = true;
365   }
366
367   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
368                                                          CServerDistributionDescription::ServerDistributionType distType)
369   {
370     CContext* context=CContext::getCurrent() ;
371
372     if (this->isClientAfterTransformationChecked) return;
373     if (context->hasClient)
374     {
375       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
376     }
377
378     this->isClientAfterTransformationChecked = true;
379   }
380
381   // Send all checked attributes to server
382   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
383                                     CServerDistributionDescription::ServerDistributionType distType)
384   {
385     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
386     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
387     CContext* context = CContext::getCurrent();
388
389     if (this->isChecked) return;
390     if (context->hasClient)
391     {
392       sendServerAttribut(globalDim, orderPositionInGrid, distType);
393       if (hasValue) sendValue();
394     }
395
396     this->isChecked = true;
397   }
398
399  void CAxis::sendValue()
400  {
401     if (n.getValue() == n_glo.getValue())
402       sendNonDistributedValue();
403     else
404       sendDistributedValue();
405  }
406
407  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
408                                     CServerDistributionDescription::ServerDistributionType distType)
409  {
410    CContext* context = CContext::getCurrent();
411    CContextClient* client = context->client;
412    int nbServer = client->serverSize;
413    int range, clientSize = client->clientSize;
414    int rank = client->clientRank;
415
416    size_t ni = this->n.getValue();
417    size_t ibegin = this->begin.getValue();
418    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
419    size_t nZoomCount = 0;
420    size_t nbIndex = index.numElements();
421    for (size_t idx = 0; idx < nbIndex; ++idx)
422    {
423      size_t globalIndex = index(idx);
424      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
425    }
426
427    CArray<size_t,1> globalIndexAxis(nbIndex);
428    std::vector<size_t> globalAxisZoom(nZoomCount);
429    nZoomCount = 0;
430    for (size_t idx = 0; idx < nbIndex; ++idx)
431    {
432      size_t globalIndex = index(idx);
433      globalIndexAxis(idx) = globalIndex;
434      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
435      {
436        globalAxisZoom[nZoomCount] = globalIndex;
437        ++nZoomCount;
438      }
439    }
440
441    std::set<int> writtenInd;
442    if (isCompressible_)
443    {
444      for (int idx = 0; idx < data_index.numElements(); ++idx)
445      {
446        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
447
448        if (ind >= 0 && ind < ni && mask(ind))
449        {
450          ind += ibegin;
451          if (ind >= global_zoom_begin && ind <= zoom_end)
452            writtenInd.insert(ind);
453        }
454      }
455    }
456
457    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
458    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
459    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
460    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
461    {
462      std::vector<int> nGlobAxis(1);
463      nGlobAxis[0] = n_glo.getValue();
464
465      size_t globalSizeIndex = 1, indexBegin, indexEnd;
466      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
467      indexBegin = 0;
468      if (globalSizeIndex <= clientSize)
469      {
470        indexBegin = rank%globalSizeIndex;
471        indexEnd = indexBegin;
472      }
473      else
474      {
475        for (int i = 0; i < clientSize; ++i)
476        {
477          range = globalSizeIndex / clientSize;
478          if (i < (globalSizeIndex%clientSize)) ++range;
479          if (i == client->clientRank) break;
480          indexBegin += range;
481        }
482        indexEnd = indexBegin + range - 1;
483      }
484
485      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
486      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
487      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
488      clientServerMap->computeServerIndexMapping(globalIndexAxis);
489      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
490      delete clientServerMap;
491    }
492    else
493    {
494      std::vector<size_t> globalIndexServer(n_glo.getValue());
495      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
496      {
497        globalIndexServer[idx] = idx;
498      }
499
500      for (int idx = 0; idx < nbServer; ++idx)
501      {
502        globalIndexAxisOnServer[idx] = globalIndexServer;
503      }
504    }
505
506    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
507                                                         ite = globalIndexAxisOnServer.end();
508    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
509                                        iteVec = (globalAxisZoom).end();
510    indSrv_.clear();
511    indWrittenSrv_.clear();
512    for (; it != ite; ++it)
513    {
514      int rank = it->first;
515      const std::vector<size_t>& globalIndexTmp = it->second;
516      int nb = globalIndexTmp.size();
517
518      for (int i = 0; i < nb; ++i)
519      {
520        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
521        {
522          indSrv_[rank].push_back(globalIndexTmp[i]);
523        }
524
525        if (writtenInd.count(globalIndexTmp[i]))
526        {
527          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
528        }
529      }
530    }
531
532    connectedServerRank_.clear();
533    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
534      connectedServerRank_.push_back(it->first);
535    }
536
537    if (!indSrv_.empty())
538    {
539      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
540                                                     iteIndSrv = indSrv_.end();
541      connectedServerRank_.clear();
542      for (; itIndSrv != iteIndSrv; ++itIndSrv)
543        connectedServerRank_.push_back(itIndSrv->first);
544    }
545    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
546  }
547
548  void CAxis::sendNonDistributedValue()
549  {
550    CContext* context = CContext::getCurrent();
551    CContextClient* client = context->client;
552    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
553
554    int zoom_end = global_zoom_begin + global_zoom_n - 1;
555    int nb = 0;
556    for (size_t idx = 0; idx < n; ++idx)
557    {
558      size_t globalIndex = begin + idx;
559      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
560    }
561
562    int nbWritten = 0;
563    if (isCompressible_)
564    {
565      for (int idx = 0; idx < data_index.numElements(); ++idx)
566      {
567        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
568
569        if (ind >= 0 && ind < n && mask(ind))
570        {
571          ind += begin;
572          if (ind >= global_zoom_begin && ind <= zoom_end)
573            ++nbWritten;
574        }
575      }
576    }
577
578    CArray<double,1> val(nb);
579    nb = 0;
580    for (size_t idx = 0; idx < n; ++idx)
581    {
582      size_t globalIndex = begin + idx;
583      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
584      {
585        val(nb) = value(idx);
586        ++nb;
587      }
588    }
589
590    CArray<int, 1> writtenInd(nbWritten);
591    nbWritten = 0;
592    if (isCompressible_)
593    {
594      for (int idx = 0; idx < data_index.numElements(); ++idx)
595      {
596        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
597
598        if (ind >= 0 && ind < n && mask(ind))
599        {
600          ind += begin;
601          if (ind >= global_zoom_begin && ind <= zoom_end)
602          {
603            writtenInd(nbWritten) = ind;
604            ++nbWritten;
605          }
606        }
607      }
608    }
609
610    if (client->isServerLeader())
611    {
612      std::list<CMessage> msgs;
613
614      const std::list<int>& ranks = client->getRanksServerLeader();
615      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
616      {
617        msgs.push_back(CMessage());
618        CMessage& msg = msgs.back();
619        msg << this->getId();
620        msg << val;
621        if (isCompressible_)
622          msg << writtenInd;
623        event.push(*itRank, 1, msg);
624      }
625      client->sendEvent(event);
626    }
627    else client->sendEvent(event);
628  }
629
630  void CAxis::sendDistributedValue(void)
631  {
632    int ns, n, i, j, ind, nv, idx;
633    CContext* context = CContext::getCurrent();
634    CContextClient* client=context->client;
635
636    // send value for each connected server
637    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
638    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
639
640    list<CMessage> list_msgsIndex, list_msgsVal;
641    list<CArray<int,1> > list_indi;
642    list<CArray<int,1> > list_writtenInd;
643    list<CArray<double,1> > list_val;
644    list<CArray<double,2> > list_bounds;
645
646    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
647    iteMap = indSrv_.end();
648    for (int k = 0; k < connectedServerRank_.size(); ++k)
649    {
650      int nbData = 0;
651      int rank = connectedServerRank_[k];
652      it = indSrv_.find(rank);
653      if (iteMap != it)
654        nbData = it->second.size();
655
656      list_indi.push_back(CArray<int,1>(nbData));
657      list_val.push_back(CArray<double,1>(nbData));
658
659      if (hasBounds_)
660      {
661        list_bounds.push_back(CArray<double,2>(2,nbData));
662      }
663
664      CArray<int,1>& indi = list_indi.back();
665      CArray<double,1>& val = list_val.back();
666
667      for (n = 0; n < nbData; ++n)
668      {
669        idx = static_cast<int>(it->second[n]);
670        ind = idx - begin;
671
672        val(n) = value(ind);
673        indi(n) = idx;
674
675        if (hasBounds_)
676        {
677          CArray<double,2>& boundsVal = list_bounds.back();
678          boundsVal(0, n) = bounds(0,n);
679          boundsVal(1, n) = bounds(1,n);
680        }
681      }
682
683      list_msgsIndex.push_back(CMessage());
684      list_msgsIndex.back() << this->getId() << list_indi.back();
685
686      if (isCompressible_)
687      {
688        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
689        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
690        CArray<int,1>& writtenInd = list_writtenInd.back();
691
692        for (n = 0; n < writtenInd.numElements(); ++n)
693          writtenInd(n) = writtenIndSrc[n];
694
695        list_msgsIndex.back() << writtenInd;
696      }
697
698      list_msgsVal.push_back(CMessage());
699      list_msgsVal.back() << this->getId() << list_val.back();
700
701      if (hasBounds_)
702      {
703        list_msgsVal.back() << list_bounds.back();
704      }
705
706      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
707      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
708    }
709
710    client->sendEvent(eventIndex);
711    client->sendEvent(eventVal);
712  }
713
714  void CAxis::recvIndex(CEventServer& event)
715  {
716    CAxis* axis;
717
718    list<CEventServer::SSubEvent>::iterator it;
719    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
720    {
721      CBufferIn* buffer = it->buffer;
722      string axisId;
723      *buffer >> axisId;
724      axis = get(axisId);
725      axis->recvIndex(it->rank, *buffer);
726    }
727
728    if (axis->isCompressible_)
729    {
730      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
731
732      CContextServer* server = CContext::getCurrent()->server;
733      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
734      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
735      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
736      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
737    }
738  }
739
740  void CAxis::recvIndex(int rank, CBufferIn& buffer)
741  {
742    buffer >> indiSrv_[rank];
743
744    if (isCompressible_)
745    {
746      CArray<int, 1> writtenIndexes;
747      buffer >> writtenIndexes;
748      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
749      for (int i = 0; i < writtenIndexes.numElements(); ++i)
750        indexesToWrite.push_back(writtenIndexes(i));
751    }
752  }
753
754  void CAxis::recvDistributedValue(CEventServer& event)
755  {
756    list<CEventServer::SSubEvent>::iterator it;
757    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
758    {
759      CBufferIn* buffer = it->buffer;
760      string axisId;
761      *buffer >> axisId;
762      get(axisId)->recvDistributedValue(it->rank, *buffer);
763    }
764  }
765
766  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
767  {
768    CArray<int,1> &indi = indiSrv_[rank];
769    CArray<double,1> val;
770    CArray<double,2> boundsVal;
771
772    buffer >> val;
773    if (hasBounds_) buffer >> boundsVal;
774
775    int i, j, ind_srv;
776    for (int ind = 0; ind < indi.numElements(); ++ind)
777    {
778      i = indi(ind);
779      ind_srv = i - zoom_begin_srv;
780      value_srv(ind_srv) = val(ind);
781      if (hasBounds_)
782      {
783        bound_srv(0,ind_srv) = boundsVal(0, ind);
784        bound_srv(1,ind_srv) = boundsVal(1, ind);
785      }
786    }
787  }
788
789   void CAxis::recvNonDistributedValue(CEventServer& event)
790  {
791    CAxis* axis;
792
793    list<CEventServer::SSubEvent>::iterator it;
794    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
795    {
796      CBufferIn* buffer = it->buffer;
797      string axisId;
798      *buffer >> axisId;
799      axis = get(axisId);
800      axis->recvNonDistributedValue(it->rank, *buffer);
801    }
802
803    if (axis->isCompressible_)
804    {
805      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
806
807      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
808      axis->offsetWrittenIndexes_ = 0;
809    }
810  }
811
812  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
813  {
814    CArray<double,1> val;
815    buffer >> val;
816
817    for (int ind = 0; ind < val.numElements(); ++ind)
818    {
819      value_srv(ind) = val(ind);
820      if (hasBounds_)
821      {
822        bound_srv(0,ind) = bounds(0,ind);
823        bound_srv(1,ind) = bounds(1,ind);
824      }
825    }
826
827    if (isCompressible_)
828    {
829      CArray<int, 1> writtenIndexes;
830      buffer >> writtenIndexes;
831      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
832      for (int i = 0; i < writtenIndexes.numElements(); ++i)
833        indexesToWrite.push_back(writtenIndexes(i));
834    }
835  }
836
837  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
838                                 CServerDistributionDescription::ServerDistributionType distType)
839  {
840    CContext* context = CContext::getCurrent();
841    CContextClient* client = context->client;
842    int nbServer = client->serverSize;
843
844    CServerDistributionDescription serverDescription(globalDim, nbServer);
845    serverDescription.computeServerDistribution();
846
847    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
848    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
849
850    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
851    if (client->isServerLeader())
852    {
853      std::list<CMessage> msgs;
854
855      const std::list<int>& ranks = client->getRanksServerLeader();
856      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
857      {
858        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
859        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
860        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
861        const int end   = begin + ni - 1;
862
863        msgs.push_back(CMessage());
864        CMessage& msg = msgs.back();
865        msg << this->getId();
866        msg << ni << begin << end;
867        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
868        msg << isCompressible_;
869
870        event.push(*itRank,1,msg);
871      }
872      client->sendEvent(event);
873    }
874    else client->sendEvent(event);
875  }
876
877  void CAxis::recvServerAttribut(CEventServer& event)
878  {
879    CBufferIn* buffer = event.subEvents.begin()->buffer;
880    string axisId;
881    *buffer >> axisId;
882    get(axisId)->recvServerAttribut(*buffer);
883  }
884
885  void CAxis::recvServerAttribut(CBufferIn& buffer)
886  {
887    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
888
889    buffer >> ni_srv >> begin_srv >> end_srv;
890    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
891    buffer >> isCompressible_;
892    global_zoom_begin = global_zoom_begin_tmp;
893    global_zoom_n  = global_zoom_n_tmp;
894    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
895
896    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
897    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
898    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
899
900    if (zoom_size_srv<=0)
901    {
902      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
903    }
904
905    if (n_glo == n)
906    {
907      zoom_begin_srv = global_zoom_begin;
908      zoom_end_srv   = global_zoom_end; //zoom_end;
909      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
910    }
911    if (hasValue)
912    {
913      value_srv.resize(zoom_size_srv);
914      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
915    }
916  }
917
918  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
919  {
920    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
921    return transformationMap_.back().second;
922  }
923
924  bool CAxis::hasTransformation()
925  {
926    return (!transformationMap_.empty());
927  }
928
929  void CAxis::setTransformations(const TransMapTypes& axisTrans)
930  {
931    transformationMap_ = axisTrans;
932  }
933
934  CAxis::TransMapTypes CAxis::getAllTransformations(void)
935  {
936    return transformationMap_;
937  }
938
939  /*!
940    Check the validity of all transformations applied on axis
941  This functions is called AFTER all inherited attributes are solved
942  */
943  void CAxis::checkTransformations()
944  {
945    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
946                                  ite = transformationMap_.end();
947//    for (it = itb; it != ite; ++it)
948//    {
949//      (it->second)->checkValid(this);
950//    }
951  }
952
953  void CAxis::duplicateTransformation(CAxis* src)
954  {
955    if (src->hasTransformation())
956    {
957      this->setTransformations(src->getAllTransformations());
958    }
959  }
960
961  /*!
962   * Go through the hierarchy to find the axis from which the transformations must be inherited
963   */
964  void CAxis::solveInheritanceTransformation()
965  {
966    if (hasTransformation() || !hasDirectAxisReference())
967      return;
968
969    CAxis* axis = this;
970    std::vector<CAxis*> refAxis;
971    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
972    {
973      refAxis.push_back(axis);
974      axis = axis->getDirectAxisReference();
975    }
976
977    if (axis->hasTransformation())
978      for (size_t i = 0; i < refAxis.size(); ++i)
979        refAxis[i]->setTransformations(axis->getAllTransformations());
980  }
981
982  void CAxis::parse(xml::CXMLNode & node)
983  {
984    SuperClass::parse(node);
985
986    if (node.goToChildElement())
987    {
988      StdString nodeElementName;
989      do
990      {
991        StdString nodeId("");
992        if (node.getAttributes().end() != node.getAttributes().find("id"))
993        { nodeId = node.getAttributes()["id"]; }
994
995        nodeElementName = node.getElementName();
996        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
997        it = transformationMapList_.find(nodeElementName);
998        if (ite != it)
999        {
1000          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1001                                                                                                               nodeId,
1002                                                                                                               &node)));
1003        }
1004      } while (node.goToNextElement()) ;
1005      node.goToParentElement();
1006    }
1007  }
1008
1009  DEFINE_REF_FUNC(Axis,axis)
1010
1011   ///---------------------------------------------------------------
1012
1013} // namespace xios
Note: See TracBrowser for help on using the repository browser.