source: XIOS/trunk/src/node/axis.cpp @ 731

Last change on this file since 731 was 731, checked in by rlacroix, 9 years ago

Correctly estimate the needed buffer sizes.

The attributes were not considered which could lead to incorrect estimations.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 29.0 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), baseRefObject(), areClientAttributesChecked_(false)
26      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
27      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
28      , transformationMap_(), global_zoom_begin(0), global_zoom_size(0)
29   {
30   }
31
32   CAxis::CAxis(const StdString & id)
33      : CObjectTemplate<CAxis>(id)
34      , CAxisAttributes(), isChecked(false), relFiles(), baseRefObject(), areClientAttributesChecked_(false)
35      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
36      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
37      , transformationMap_(), global_zoom_begin(0), global_zoom_size(0)
38   {
39   }
40
41   CAxis::~CAxis(void)
42   { /* Ne rien faire de plus */ }
43
44   ///---------------------------------------------------------------
45
46   const std::set<StdString> & CAxis::getRelFiles(void) const
47   {
48      return (this->relFiles);
49   }
50
51   bool CAxis::IsWritten(const StdString & filename) const
52   {
53      return (this->relFiles.find(filename) != this->relFiles.end());
54   }
55
56   bool CAxis::isWrittenCompressed(const StdString& filename) const
57   {
58      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
59   }
60
61   bool CAxis::isDistributed(void) const
62   {
63      return isDistributed_;
64   }
65
66   /*!
67    * Test whether the data defined on the axis can be outputted in a compressed way.
68    *
69    * \return true if and only if a mask was defined for this axis
70    */
71   bool CAxis::isCompressible(void) const
72   {
73      return isCompressible_;
74   }
75
76   void CAxis::addRelFile(const StdString & filename)
77   {
78      this->relFiles.insert(filename);
79   }
80
81   void CAxis::addRelFileCompressed(const StdString& filename)
82   {
83      this->relFilesCompressed.insert(filename);
84   }
85
86   //----------------------------------------------------------------
87
88   const std::vector<int>& CAxis::getIndexesToWrite(void) const
89   {
90     return indexesToWrite;
91   }
92
93   /*!
94     Returns the number of indexes written by each server.
95     \return the number of indexes written by each server
96   */
97   int CAxis::getNumberWrittenIndexes() const
98   {
99     return numberWrittenIndexes_;
100   }
101
102   /*!
103     Returns the total number of indexes written by the servers.
104     \return the total number of indexes written by the servers
105   */
106   int CAxis::getTotalNumberWrittenIndexes() const
107   {
108     return totalNumberWrittenIndexes_;
109   }
110
111   /*!
112     Returns the offset of indexes written by each server.
113     \return the offset of indexes written by each server
114   */
115   int CAxis::getOffsetWrittenIndexes() const
116   {
117     return offsetWrittenIndexes_;
118   }
119
120   //----------------------------------------------------------------
121
122   /*!
123    * Compute the minimum buffer size required to send the attributes to the server(s).
124    *
125    * \return A map associating the server rank with its minimum buffer size.
126    */
127   std::map<int, StdSize> CAxis::getAttributesBufferSize()
128   {
129     CContextClient* client = CContext::getCurrent()->client;
130
131     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
132
133     bool isNonDistributed = (n == n_glo);
134
135     if (client->isServerLeader())
136     {
137       // size estimation for sendServerAttribut
138       size_t size = 6 * sizeof(size_t);
139       // size estimation for sendNonDistributedValue
140       if (isNonDistributed)
141         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
142       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
143
144       const std::list<int>& ranks = client->getRanksServerLeader();
145       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
146       {
147         if (size > attributesSizes[*itRank])
148           attributesSizes[*itRank] = size;
149       }
150     }
151
152     if (!isNonDistributed)
153     {
154       // size estimation for sendDistributedValue
155       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
156       for (it = indSrv_.begin(); it != ite; ++it)
157       {
158         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
159         if (isCompressible_)
160           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
161
162         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
163         if (hasBounds_)
164           sizeValEvent += CArray<double,2>::size(it->second.size());
165
166         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
167         if (size > attributesSizes[it->first])
168           attributesSizes[it->first] = size;
169       }
170     }
171
172     return attributesSizes;
173   }
174
175   //----------------------------------------------------------------
176
177   StdString CAxis::GetName(void)   { return (StdString("axis")); }
178   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
179   ENodeType CAxis::GetType(void)   { return (eAxis); }
180
181   //----------------------------------------------------------------
182
183   CAxis* CAxis::createAxis()
184   {
185     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
186     return axis;
187   }
188
189   void CAxis::checkAttributes(void)
190   {
191      if (this->n_glo.isEmpty())
192        ERROR("CAxis::checkAttributes(void)",
193              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
194              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
195      StdSize size = this->n_glo.getValue();
196
197      isDistributed_ = !this->begin.isEmpty() || !this->n.isEmpty();
198
199      if (!this->begin.isEmpty())
200      {
201        if (begin < 0 || begin > size - 1)
202          ERROR("CAxis::checkAttributes(void)",
203                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
204                << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
205      }
206      else this->begin.setValue(0);
207
208      if (!this->n.isEmpty())
209      {
210        if (n < 0 || n > size)
211          ERROR("CAxis::checkAttributes(void)",
212                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
213                << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
214      }
215      else this->n.setValue(size);
216
217      StdSize true_size = value.numElements();
218      if (this->n.getValue() != true_size)
219        ERROR("CAxis::checkAttributes(void)",
220              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
221              << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
222
223      this->checkData();
224      this->checkZoom();
225      this->checkMask();
226      this->checkBounds();
227   }
228
229   void CAxis::checkData()
230   {
231      if (data_begin.isEmpty()) data_begin.setValue(0);
232
233      if (data_n.isEmpty())
234      {
235        data_n.setValue(n);
236      }
237      else if (data_n.getValue() < 0)
238      {
239        ERROR("CAxis::checkData(void)",
240              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
241              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
242      }
243
244      if (data_index.isEmpty())
245      {
246        data_index.resize(data_n);
247        for (int i = 0; i < data_n; ++i) data_index(i) = i;
248      }
249   }
250
251   void CAxis::checkZoom(void)
252   {
253     if (0 == global_zoom_size) global_zoom_size = this->n_glo.getValue();
254   }
255
256   void CAxis::checkMask()
257   {
258      if (!mask.isEmpty())
259      {
260         if (mask.extent(0) != n)
261           ERROR("CAxis::checkMask(void)",
262                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
263                 << "The mask does not have the same size as the local domain." << std::endl
264                 << "Local size is " << n.getValue() << "." << std::endl
265                 << "Mask size is " << mask.extent(0) << ".");
266      }
267      else // (mask.isEmpty())
268      { // If no mask was defined, we create a default one without any masked point.
269         mask.resize(n);
270         for (int i = 0; i < n; ++i)
271         {
272           mask(i) = true;
273         }
274      }
275   }
276
277  void CAxis::checkBounds()
278  {
279    if (!bounds.isEmpty())
280    {
281      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
282        ERROR("CAxis::checkAttributes(void)",
283              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
284              << "Axis size is " << n.getValue() << "." << std::endl
285              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
286      hasBounds_ = true;
287    }
288    else hasBounds_ = false;
289  }
290
291  void CAxis::checkEligibilityForCompressedOutput()
292  {
293    // We don't check if the mask is valid here, just if a mask has been defined at this point.
294    isCompressible_ = !mask.isEmpty();
295  }
296
297  bool CAxis::dispatchEvent(CEventServer& event)
298   {
299      if (SuperClass::dispatchEvent(event)) return true;
300      else
301      {
302        switch(event.type)
303        {
304           case EVENT_ID_SERVER_ATTRIBUT :
305             recvServerAttribut(event);
306             return true;
307             break;
308           case EVENT_ID_INDEX:
309            recvIndex(event);
310            return true;
311            break;
312          case EVENT_ID_DISTRIBUTED_VALUE:
313            recvDistributedValue(event);
314            return true;
315            break;
316          case EVENT_ID_NON_DISTRIBUTED_VALUE:
317            recvNonDistributedValue(event);
318            return true;
319            break;
320           default :
321             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
322                    << "Unknown Event");
323           return false;
324         }
325      }
326   }
327
328   void CAxis::checkAttributesOnClient(const std::vector<int>& globalDim, int orderPositionInGrid,
329                                       CServerDistributionDescription::ServerDistributionType distType)
330   {
331     if (this->areClientAttributesChecked_) return;
332
333     this->checkAttributes();
334
335     this->areClientAttributesChecked_ = true;
336   }
337
338   // Send all checked attributes to server
339   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
340                                     CServerDistributionDescription::ServerDistributionType distType)
341   {
342     if (!this->areClientAttributesChecked_) checkAttributesOnClient(globalDim,
343                                                                     orderPositionInGrid,
344                                                                     distType);
345     CContext* context = CContext::getCurrent();
346
347     if (this->isChecked) return;
348     if (context->hasClient)
349     {
350       sendServerAttribut(globalDim, orderPositionInGrid, distType);
351       sendValue();
352     }
353
354     this->isChecked = true;
355   }
356
357  void CAxis::sendValue()
358  {
359     if (n.getValue() == n_glo.getValue())
360     {
361       sendNonDistributedValue();
362     }
363     else
364     {
365       computeConnectedServer();
366       sendDistributedValue();
367     }
368  }
369
370  void CAxis::computeConnectedServer()
371  {
372    CContext* context = CContext::getCurrent();
373    CContextClient* client = context->client;
374    int nbServer = client->serverSize;
375    int range, clientSize = client->clientSize;
376
377    size_t ni = this->n.getValue();
378    size_t ibegin = this->begin.getValue();
379    size_t zoom_end = global_zoom_begin+global_zoom_size-1;
380    size_t nZoomCount = 0;
381    for (size_t idx = 0; idx < ni; ++idx)
382    {
383      size_t globalIndex = ibegin + idx;
384
385      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
386    }
387
388    CArray<size_t,1> globalIndexAxis(ni);
389    std::vector<size_t> globalAxisZoom(nZoomCount);
390    nZoomCount = 0;
391    for (size_t idx = 0; idx < ni; ++idx)
392    {
393      size_t globalIndex = ibegin + idx;
394      globalIndexAxis(idx) = globalIndex;
395      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
396      {
397        globalAxisZoom[nZoomCount] = globalIndex;
398        ++nZoomCount;
399      }
400    }
401
402    std::set<int> writtenInd;
403    if (isCompressible_)
404    {
405      for (int idx = 0; idx < data_index.numElements(); ++idx)
406      {
407        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
408
409        if (ind >= 0 && ind < ni && mask(ind))
410        {
411          ind += ibegin;
412          if (ind >= global_zoom_begin && ind <= zoom_end)
413            writtenInd.insert(ind);
414        }
415      }
416    }
417
418    std::vector<int> nGlobDomain(1);
419    nGlobDomain[0] = n_glo.getValue();
420
421    size_t globalSizeIndex = 1, indexBegin, indexEnd;
422    for (int i = 0; i < nGlobDomain.size(); ++i) globalSizeIndex *= nGlobDomain[i];
423    indexBegin = 0;
424    for (int i = 0; i < clientSize; ++i)
425    {
426      range = globalSizeIndex / clientSize;
427      if (i < (globalSizeIndex%clientSize)) ++range;
428      if (i == client->clientRank) break;
429      indexBegin += range;
430    }
431    indexEnd = indexBegin + range - 1;
432
433    CServerDistributionDescription serverDescription(nGlobDomain);
434    serverDescription.computeServerGlobalIndexInRange(nbServer, std::make_pair<size_t,size_t>(indexBegin, indexEnd), 0);
435    CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
436    clientServerMap->computeServerIndexMapping(globalIndexAxis);
437    const std::map<int, std::vector<size_t> >& globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
438
439    std::map<int, std::vector<size_t> >::const_iterator it = globalIndexAxisOnServer.begin(),
440                                                       ite = globalIndexAxisOnServer.end();
441    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
442                                        iteVec = (globalAxisZoom).end();
443    indSrv_.clear();
444    indWrittenSrv_.clear();
445    for (; it != ite; ++it)
446    {
447      int rank = it->first;
448      const std::vector<size_t>& globalIndexTmp = it->second;
449      int nb = globalIndexTmp.size();
450
451      for (int i = 0; i < nb; ++i)
452      {
453        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
454        {
455          indSrv_[rank].push_back(globalIndexTmp[i]);
456        }
457
458        if (writtenInd.count(globalIndexTmp[i]))
459        {
460          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
461        }
462      }
463    }
464
465    connectedServerRank_.clear();
466    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
467      connectedServerRank_.push_back(it->first);
468    }
469
470    if (!indSrv_.empty())
471    {
472      connectedServerRank_.clear();
473      for (it = indSrv_.begin(); it != indSrv_.end(); ++it)
474        connectedServerRank_.push_back(it->first);
475    }
476    nbConnectedClients_ = clientServerMap->computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
477    delete clientServerMap;
478  }
479
480  void CAxis::sendNonDistributedValue()
481  {
482    CContext* context = CContext::getCurrent();
483    CContextClient* client = context->client;
484    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
485
486    int zoom_end = global_zoom_begin + global_zoom_size - 1;
487    int nb = 0;
488    for (size_t idx = 0; idx < n; ++idx)
489    {
490      size_t globalIndex = begin + idx;
491      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
492    }
493
494    int nbWritten = 0;
495    if (isCompressible_)
496    {
497      for (int idx = 0; idx < data_index.numElements(); ++idx)
498      {
499        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
500
501        if (ind >= 0 && ind < n && mask(ind))
502        {
503          ind += begin;
504          if (ind >= global_zoom_begin && ind <= zoom_end)
505            ++nbWritten;
506        }
507      }
508    }
509
510    CArray<double,1> val(nb);
511    nb = 0;
512    for (size_t idx = 0; idx < n; ++idx)
513    {
514      size_t globalIndex = begin + idx;
515      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
516      {
517        val(nb) = value(idx);
518        ++nb;
519      }
520    }
521
522    CArray<int, 1> writtenInd(nbWritten);
523    nbWritten = 0;
524    if (isCompressible_)
525    {
526      for (int idx = 0; idx < data_index.numElements(); ++idx)
527      {
528        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
529
530        if (ind >= 0 && ind < n && mask(ind))
531        {
532          ind += begin;
533          if (ind >= global_zoom_begin && ind <= zoom_end)
534          {
535            writtenInd(nbWritten) = ind;
536            ++nbWritten;
537          }
538        }
539      }
540    }
541
542    if (client->isServerLeader())
543    {
544      std::list<CMessage> msgs;
545
546      const std::list<int>& ranks = client->getRanksServerLeader();
547      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
548      {
549        msgs.push_back(CMessage());
550        CMessage& msg = msgs.back();
551        msg << this->getId();
552        msg << val;
553        if (isCompressible_)
554          msg << writtenInd;
555        event.push(*itRank, 1, msg);
556      }
557      client->sendEvent(event);
558    }
559    else client->sendEvent(event);
560  }
561
562  void CAxis::sendDistributedValue(void)
563  {
564    int ns, n, i, j, ind, nv, idx;
565    CContext* context = CContext::getCurrent();
566    CContextClient* client=context->client;
567
568    // send value for each connected server
569    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
570    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
571
572    list<CMessage> list_msgsIndex, list_msgsVal;
573    list<CArray<int,1> > list_indi;
574    list<CArray<int,1> > list_writtenInd;
575    list<CArray<double,1> > list_val;
576    list<CArray<double,2> > list_bounds;
577
578    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
579    iteMap = indSrv_.end();
580    for (int k = 0; k < connectedServerRank_.size(); ++k)
581    {
582      int nbData = 0;
583      int rank = connectedServerRank_[k];
584      it = indSrv_.find(rank);
585      if (iteMap != it)
586        nbData = it->second.size();
587
588      list_indi.push_back(CArray<int,1>(nbData));
589      list_val.push_back(CArray<double,1>(nbData));
590
591      if (hasBounds_)
592      {
593        list_bounds.push_back(CArray<double,2>(2,nbData));
594      }
595
596      CArray<int,1>& indi = list_indi.back();
597      CArray<double,1>& val = list_val.back();
598
599      for (n = 0; n < nbData; ++n)
600      {
601        idx = static_cast<int>(it->second[n]);
602        ind = idx - begin;
603
604        val(n) = value(ind);
605        indi(n) = idx;
606
607        if (hasBounds_)
608        {
609          CArray<double,2>& boundsVal = list_bounds.back();
610          boundsVal(0, n) = bounds(0,n);
611          boundsVal(1, n) = bounds(1,n);
612        }
613      }
614
615      list_msgsIndex.push_back(CMessage());
616      list_msgsIndex.back() << this->getId() << list_indi.back();
617
618      if (isCompressible_)
619      {
620        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
621        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
622        CArray<int,1>& writtenInd = list_writtenInd.back();
623
624        for (n = 0; n < writtenInd.numElements(); ++n)
625          writtenInd(n) = writtenIndSrc[n];
626
627        list_msgsIndex.back() << writtenInd;
628      }
629
630      list_msgsVal.push_back(CMessage());
631      list_msgsVal.back() << this->getId() << list_val.back();
632
633      if (hasBounds_)
634      {
635        list_msgsVal.back() << list_bounds.back();
636      }
637
638      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
639      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
640    }
641
642    client->sendEvent(eventIndex);
643    client->sendEvent(eventVal);
644  }
645
646  void CAxis::recvIndex(CEventServer& event)
647  {
648    CAxis* axis;
649
650    list<CEventServer::SSubEvent>::iterator it;
651    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
652    {
653      CBufferIn* buffer = it->buffer;
654      string axisId;
655      *buffer >> axisId;
656      axis = get(axisId);
657      axis->recvIndex(it->rank, *buffer);
658    }
659
660    if (axis->isCompressible_)
661    {
662      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
663
664      CContextServer* server = CContext::getCurrent()->server;
665      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
666      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
667      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
668      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
669    }
670  }
671
672  void CAxis::recvIndex(int rank, CBufferIn& buffer)
673  {
674    buffer >> indiSrv_[rank];
675
676    if (isCompressible_)
677    {
678      CArray<int, 1> writtenIndexes;
679      buffer >> writtenIndexes;
680      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
681      for (int i = 0; i < writtenIndexes.numElements(); ++i)
682        indexesToWrite.push_back(writtenIndexes(i));
683    }
684  }
685
686  void CAxis::recvDistributedValue(CEventServer& event)
687  {
688    list<CEventServer::SSubEvent>::iterator it;
689    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
690    {
691      CBufferIn* buffer = it->buffer;
692      string axisId;
693      *buffer >> axisId;
694      get(axisId)->recvDistributedValue(it->rank, *buffer);
695    }
696  }
697
698  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
699  {
700    CArray<int,1> &indi = indiSrv_[rank];
701    CArray<double,1> val;
702    CArray<double,2> boundsVal;
703
704    buffer >> val;
705    if (hasBounds_) buffer >> boundsVal;
706
707    int i, j, ind_srv;
708    for (int ind = 0; ind < indi.numElements(); ++ind)
709    {
710      i = indi(ind);
711      ind_srv = i - zoom_begin_srv;
712      value_srv(ind_srv) = val(ind);
713      if (hasBounds_)
714      {
715        bound_srv(0,ind_srv) = boundsVal(0, ind);
716        bound_srv(1,ind_srv) = boundsVal(1, ind);
717      }
718    }
719  }
720
721   void CAxis::recvNonDistributedValue(CEventServer& event)
722  {
723    CAxis* axis;
724
725    list<CEventServer::SSubEvent>::iterator it;
726    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
727    {
728      CBufferIn* buffer = it->buffer;
729      string axisId;
730      *buffer >> axisId;
731      axis = get(axisId);
732      axis->recvNonDistributedValue(it->rank, *buffer);
733    }
734
735    if (axis->isCompressible_)
736    {
737      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
738
739      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
740      axis->offsetWrittenIndexes_ = 0;
741    }
742  }
743
744  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
745  {
746    CArray<double,1> val;
747    buffer >> val;
748
749    for (int ind = 0; ind < val.numElements(); ++ind)
750    {
751      value_srv(ind) = val(ind);
752      if (hasBounds_)
753      {
754        bound_srv(0,ind) = bounds(0,ind);
755        bound_srv(1,ind) = bounds(1,ind);
756      }
757    }
758
759    if (isCompressible_)
760    {
761      CArray<int, 1> writtenIndexes;
762      buffer >> writtenIndexes;
763      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
764      for (int i = 0; i < writtenIndexes.numElements(); ++i)
765        indexesToWrite.push_back(writtenIndexes(i));
766    }
767  }
768
769  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
770                                 CServerDistributionDescription::ServerDistributionType distType)
771  {
772    CContext* context = CContext::getCurrent();
773    CContextClient* client = context->client;
774
775    CServerDistributionDescription serverDescription(globalDim);
776
777    int nbServer = client->serverSize;
778
779    serverDescription.computeServerDistribution(nbServer, false, distType);
780    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
781    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
782
783    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
784    if (client->isServerLeader())
785    {
786      std::list<CMessage> msgs;
787
788      const std::list<int>& ranks = client->getRanksServerLeader();
789      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
790      {
791        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
792        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
793        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
794        const int end   = begin + ni - 1;
795
796        msgs.push_back(CMessage());
797        CMessage& msg = msgs.back();
798        msg << this->getId();
799        msg << ni << begin << end;
800        msg << global_zoom_begin << global_zoom_size;
801        msg << isCompressible_;
802
803        event.push(*itRank,1,msg);
804      }
805      client->sendEvent(event);
806    }
807    else client->sendEvent(event);
808  }
809
810  void CAxis::recvServerAttribut(CEventServer& event)
811  {
812    CBufferIn* buffer = event.subEvents.begin()->buffer;
813    string axisId;
814    *buffer >> axisId;
815    get(axisId)->recvServerAttribut(*buffer);
816  }
817
818  void CAxis::recvServerAttribut(CBufferIn& buffer)
819  {
820    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_size_tmp;
821
822    buffer >> ni_srv >> begin_srv >> end_srv;
823    buffer >> global_zoom_begin_tmp >> global_zoom_size_tmp;
824    buffer >> isCompressible_;
825    global_zoom_begin = global_zoom_begin_tmp;
826    global_zoom_size  = global_zoom_size_tmp;
827    int global_zoom_end = global_zoom_begin + global_zoom_size - 1;
828
829    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
830    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
831    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
832
833    if (zoom_size_srv<=0)
834    {
835      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
836    }
837
838    if (n_glo == n)
839    {
840      zoom_begin_srv = global_zoom_begin;
841      zoom_end_srv   = global_zoom_end; //zoom_end;
842      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
843    }
844    value_srv.resize(zoom_size_srv);
845    bound_srv.resize(2,zoom_size_srv);
846  }
847
848  bool CAxis::hasTransformation()
849  {
850    return (!transformationMap_.empty());
851  }
852
853  void CAxis::setTransformations(const TransMapTypes& axisTrans)
854  {
855    transformationMap_ = axisTrans;
856  }
857
858  CAxis::TransMapTypes CAxis::getAllTransformations(void)
859  {
860    return transformationMap_;
861  }
862
863  /*!
864    Check the validity of all transformations applied on axis
865  This functions is called AFTER all inherited attributes are solved
866  */
867  void CAxis::checkTransformations()
868  {
869    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
870                                  ite = transformationMap_.end();
871    for (it = itb; it != ite; ++it)
872    {
873      (it->second)->checkValid(this);
874    }
875  }
876
877  void CAxis::solveInheritanceTransformation()
878  {
879    if (this->hasTransformation()) return;
880
881    std::vector<CAxis*> refAxis;
882    CAxis* refer_sptr;
883    CAxis* refer_ptr = this;
884    while (refer_ptr->hasDirectAxisReference())
885    {
886      refAxis.push_back(refer_ptr);
887      refer_sptr = refer_ptr->getDirectAxisReference();
888      refer_ptr  = refer_sptr;
889      if (refer_ptr->hasTransformation()) break;
890    }
891
892    if (refer_ptr->hasTransformation())
893      for (int idx = 0; idx < refAxis.size(); ++idx)
894        refAxis[idx]->setTransformations(refer_ptr->getAllTransformations());
895  }
896
897  void CAxis::parse(xml::CXMLNode & node)
898  {
899    SuperClass::parse(node);
900
901    if (node.goToChildElement())
902    {
903      StdString inverseAxisDefRoot("inverse_axis_definition");
904      StdString inverse("inverse_axis");
905      StdString zoomAxisDefRoot("zoom_axis_definition");
906      StdString zoom("zoom_axis");
907      StdString interpAxisDefRoot("interpolate_axis_definition");
908      StdString interp("interpolate_axis");
909      do
910      {
911        if (node.getElementName() == inverse) {
912          CInverseAxis* tmp = (CInverseAxisGroup::get(inverseAxisDefRoot))->createChild();
913          tmp->parse(node);
914          transformationMap_.push_back(std::make_pair(TRANS_INVERSE_AXIS,tmp));
915        } else if (node.getElementName() == zoom) {
916          CZoomAxis* tmp = (CZoomAxisGroup::get(zoomAxisDefRoot))->createChild();
917          tmp->parse(node);
918          transformationMap_.push_back(std::make_pair(TRANS_ZOOM_AXIS,tmp));
919        }
920        else if (node.getElementName() == interp) {
921          CInterpolateAxis* tmp = (CInterpolateAxisGroup::get(interpAxisDefRoot))->createChild();
922          tmp->parse(node);
923          transformationMap_.push_back(std::make_pair(TRANS_INTERPOLATE_AXIS,tmp));
924        }
925      } while (node.goToNextElement()) ;
926      node.goToParentElement();
927    }
928  }
929
930  DEFINE_REF_FUNC(Axis,axis)
931
932   ///---------------------------------------------------------------
933
934} // namespace xios
Note: See TracBrowser for help on using the repository browser.