source: XIOS/trunk/src/node/axis.cpp @ 817

Last change on this file since 817 was 817, checked in by mhnguyen, 7 years ago

Adding some attributes for axis and grid (ticket 71, 78)

+) Add index attribute for axis
+) Change mask?d to mask_?d for grid

Test
+) On Curie
+) Test passes

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 30.4 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
27      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
28      , transformationMap_(), global_zoom_begin(0), global_zoom_size(0), hasValue(false)
29   {
30   }
31
32   CAxis::CAxis(const StdString & id)
33      : CObjectTemplate<CAxis>(id)
34      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
35      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
36      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
37      , transformationMap_(), global_zoom_begin(0), global_zoom_size(0), hasValue(false)
38   {
39   }
40
41   CAxis::~CAxis(void)
42   { /* Ne rien faire de plus */ }
43
44   ///---------------------------------------------------------------
45
46   const std::set<StdString> & CAxis::getRelFiles(void) const
47   {
48      return (this->relFiles);
49   }
50
51   bool CAxis::IsWritten(const StdString & filename) const
52   {
53      return (this->relFiles.find(filename) != this->relFiles.end());
54   }
55
56   bool CAxis::isWrittenCompressed(const StdString& filename) const
57   {
58      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
59   }
60
61   bool CAxis::isDistributed(void) const
62   {
63      return isDistributed_;
64   }
65
66   /*!
67    * Test whether the data defined on the axis can be outputted in a compressed way.
68    *
69    * \return true if and only if a mask was defined for this axis
70    */
71   bool CAxis::isCompressible(void) const
72   {
73      return isCompressible_;
74   }
75
76   void CAxis::addRelFile(const StdString & filename)
77   {
78      this->relFiles.insert(filename);
79   }
80
81   void CAxis::addRelFileCompressed(const StdString& filename)
82   {
83      this->relFilesCompressed.insert(filename);
84   }
85
86   //----------------------------------------------------------------
87
88   const std::vector<int>& CAxis::getIndexesToWrite(void) const
89   {
90     return indexesToWrite;
91   }
92
93   /*!
94     Returns the number of indexes written by each server.
95     \return the number of indexes written by each server
96   */
97   int CAxis::getNumberWrittenIndexes() const
98   {
99     return numberWrittenIndexes_;
100   }
101
102   /*!
103     Returns the total number of indexes written by the servers.
104     \return the total number of indexes written by the servers
105   */
106   int CAxis::getTotalNumberWrittenIndexes() const
107   {
108     return totalNumberWrittenIndexes_;
109   }
110
111   /*!
112     Returns the offset of indexes written by each server.
113     \return the offset of indexes written by each server
114   */
115   int CAxis::getOffsetWrittenIndexes() const
116   {
117     return offsetWrittenIndexes_;
118   }
119
120   //----------------------------------------------------------------
121
122   /*!
123    * Compute the minimum buffer size required to send the attributes to the server(s).
124    *
125    * \return A map associating the server rank with its minimum buffer size.
126    */
127   std::map<int, StdSize> CAxis::getAttributesBufferSize()
128   {
129     CContextClient* client = CContext::getCurrent()->client;
130
131     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
132
133     bool isNonDistributed = (n == n_glo);
134
135     if (client->isServerLeader())
136     {
137       // size estimation for sendServerAttribut
138       size_t size = 6 * sizeof(size_t);
139       // size estimation for sendNonDistributedValue
140       if (isNonDistributed)
141         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
142       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
143
144       const std::list<int>& ranks = client->getRanksServerLeader();
145       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
146       {
147         if (size > attributesSizes[*itRank])
148           attributesSizes[*itRank] = size;
149       }
150     }
151
152     if (!isNonDistributed)
153     {
154       // size estimation for sendDistributedValue
155       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
156       for (it = indSrv_.begin(); it != ite; ++it)
157       {
158         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
159         if (isCompressible_)
160           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
161
162         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
163         if (hasBounds_)
164           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
165
166         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
167         if (size > attributesSizes[it->first])
168           attributesSizes[it->first] = size;
169       }
170     }
171
172     return attributesSizes;
173   }
174
175   //----------------------------------------------------------------
176
177   StdString CAxis::GetName(void)   { return (StdString("axis")); }
178   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
179   ENodeType CAxis::GetType(void)   { return (eAxis); }
180
181   //----------------------------------------------------------------
182
183   CAxis* CAxis::createAxis()
184   {
185     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
186     return axis;
187   }
188
189   void CAxis::fillInValues(const CArray<double,1>& values)
190   {
191     this->value = values;
192   }
193
194   void CAxis::checkAttributes(void)
195   {
196      if (this->n_glo.isEmpty())
197        ERROR("CAxis::checkAttributes(void)",
198              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
199              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
200      StdSize size = this->n_glo.getValue();
201
202      isDistributed_ = !this->begin.isEmpty() || !this->n.isEmpty();
203
204      if (!this->begin.isEmpty())
205      {
206        if (begin < 0 || begin > size - 1)
207          ERROR("CAxis::checkAttributes(void)",
208                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
209                << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
210      }
211      else this->begin.setValue(0);
212
213      if (!this->n.isEmpty())
214      {
215        if (n < 0 || n > size)
216          ERROR("CAxis::checkAttributes(void)",
217                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
218                << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
219      }
220      else this->n.setValue(size);
221
222      if (!this->value.isEmpty())
223      {
224        StdSize true_size = value.numElements();
225        if (this->n.getValue() != true_size)
226          ERROR("CAxis::checkAttributes(void)",
227                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
228                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
229        this->hasValue = true;
230      }
231
232      if (this->index.isEmpty())
233      {
234        index.resize(n);
235        for (int i = 0; i < n; ++i) index(i) = i+begin;
236      }
237
238      this->checkData();
239      this->checkZoom();
240      this->checkMask();
241      this->checkBounds();
242   }
243
244   void CAxis::checkData()
245   {
246      if (data_begin.isEmpty()) data_begin.setValue(0);
247
248      if (data_n.isEmpty())
249      {
250        data_n.setValue(n);
251      }
252      else if (data_n.getValue() < 0)
253      {
254        ERROR("CAxis::checkData(void)",
255              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
256              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
257      }
258
259      if (data_index.isEmpty())
260      {
261        data_index.resize(data_n);
262        for (int i = 0; i < data_n; ++i) data_index(i) = i;
263      }
264   }
265
266   void CAxis::checkZoom(void)
267   {
268     if (0 == global_zoom_size) global_zoom_size = this->n_glo.getValue();
269   }
270
271   void CAxis::checkMask()
272   {
273      if (!mask.isEmpty())
274      {
275         if (mask.extent(0) != n)
276           ERROR("CAxis::checkMask(void)",
277                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
278                 << "The mask does not have the same size as the local domain." << std::endl
279                 << "Local size is " << n.getValue() << "." << std::endl
280                 << "Mask size is " << mask.extent(0) << ".");
281      }
282      else // (mask.isEmpty())
283      { // If no mask was defined, we create a default one without any masked point.
284         mask.resize(n);
285         for (int i = 0; i < n; ++i)
286         {
287           mask(i) = true;
288         }
289      }
290   }
291
292  void CAxis::checkBounds()
293  {
294    if (!bounds.isEmpty())
295    {
296      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
297        ERROR("CAxis::checkAttributes(void)",
298              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
299              << "Axis size is " << n.getValue() << "." << std::endl
300              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
301      hasBounds_ = true;
302    }
303    else hasBounds_ = false;
304  }
305
306  void CAxis::checkEligibilityForCompressedOutput()
307  {
308    // We don't check if the mask is valid here, just if a mask has been defined at this point.
309    isCompressible_ = !mask.isEmpty();
310  }
311
312  bool CAxis::dispatchEvent(CEventServer& event)
313   {
314      if (SuperClass::dispatchEvent(event)) return true;
315      else
316      {
317        switch(event.type)
318        {
319           case EVENT_ID_SERVER_ATTRIBUT :
320             recvServerAttribut(event);
321             return true;
322             break;
323           case EVENT_ID_INDEX:
324            recvIndex(event);
325            return true;
326            break;
327          case EVENT_ID_DISTRIBUTED_VALUE:
328            recvDistributedValue(event);
329            return true;
330            break;
331          case EVENT_ID_NON_DISTRIBUTED_VALUE:
332            recvNonDistributedValue(event);
333            return true;
334            break;
335           default :
336             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
337                    << "Unknown Event");
338           return false;
339         }
340      }
341   }
342
343   void CAxis::checkAttributesOnClient()
344   {
345     if (this->areClientAttributesChecked_) return;
346
347     this->checkAttributes();
348
349     this->areClientAttributesChecked_ = true;
350   }
351
352   // Send all checked attributes to server
353   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
354                                     CServerDistributionDescription::ServerDistributionType distType)
355   {
356     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
357     CContext* context = CContext::getCurrent();
358
359     if (this->isChecked) return;
360     if (context->hasClient)
361     {
362       sendServerAttribut(globalDim, orderPositionInGrid, distType);
363       if (hasValue) sendValue(globalDim, orderPositionInGrid, distType);
364     }
365
366     this->isChecked = true;
367   }
368
369  void CAxis::sendValue(const std::vector<int>& globalDim, int orderPositionInGrid,
370                        CServerDistributionDescription::ServerDistributionType distType)
371  {
372     if (n.getValue() == n_glo.getValue())
373     {
374       sendNonDistributedValue();
375     }
376     else
377     {
378       computeConnectedServer(globalDim, orderPositionInGrid, distType);
379       sendDistributedValue();
380     }
381  }
382
383  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
384                                     CServerDistributionDescription::ServerDistributionType distType)
385  {
386    CContext* context = CContext::getCurrent();
387    CContextClient* client = context->client;
388    int nbServer = client->serverSize;
389    int range, clientSize = client->clientSize;
390
391    size_t ni = this->n.getValue();
392    size_t ibegin = this->begin.getValue();
393    size_t zoom_end = global_zoom_begin+global_zoom_size-1;
394    size_t nZoomCount = 0;
395    size_t nbIndex = index.numElements();
396    for (size_t idx = 0; idx < nbIndex; ++idx)
397    {
398      size_t globalIndex = index(idx);
399      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
400    }
401
402    CArray<size_t,1> globalIndexAxis(nbIndex);
403    std::vector<size_t> globalAxisZoom(nZoomCount);
404    nZoomCount = 0;
405    for (size_t idx = 0; idx < nbIndex; ++idx)
406    {
407      size_t globalIndex = index(idx);
408      globalIndexAxis(idx) = globalIndex;
409      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
410      {
411        globalAxisZoom[nZoomCount] = globalIndex;
412        ++nZoomCount;
413      }
414    }
415
416    std::set<int> writtenInd;
417    if (isCompressible_)
418    {
419      for (int idx = 0; idx < data_index.numElements(); ++idx)
420      {
421        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
422
423        if (ind >= 0 && ind < ni && mask(ind))
424        {
425          ind += ibegin;
426          if (ind >= global_zoom_begin && ind <= zoom_end)
427            writtenInd.insert(ind);
428        }
429      }
430    }
431
432    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
433    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
434    std::map<int, std::vector<size_t> > globalIndexAxisOnServer;
435    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
436    {
437      std::vector<int> nGlobAxis(1);
438      nGlobAxis[0] = n_glo.getValue();
439
440      size_t globalSizeIndex = 1, indexBegin, indexEnd;
441      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
442      indexBegin = 0;
443      for (int i = 0; i < clientSize; ++i)
444      {
445        range = globalSizeIndex / clientSize;
446        if (i < (globalSizeIndex%clientSize)) ++range;
447        if (i == client->clientRank) break;
448        indexBegin += range;
449      }
450      indexEnd = indexBegin + range - 1;
451
452      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
453      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
454      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
455      clientServerMap->computeServerIndexMapping(globalIndexAxis);
456      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
457      delete clientServerMap;
458    }
459    else
460    {
461      std::vector<size_t> globalIndexServer(n_glo.getValue());
462      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
463      {
464        globalIndexServer[idx] = idx;
465      }
466
467      for (int idx = 0; idx < nbServer; ++idx)
468      {
469        globalIndexAxisOnServer[idx] = globalIndexServer;
470      }
471    }
472
473    std::map<int, std::vector<size_t> >::const_iterator it = globalIndexAxisOnServer.begin(),
474                                                       ite = globalIndexAxisOnServer.end();
475    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
476                                        iteVec = (globalAxisZoom).end();
477    indSrv_.clear();
478    indWrittenSrv_.clear();
479    for (; it != ite; ++it)
480    {
481      int rank = it->first;
482      const std::vector<size_t>& globalIndexTmp = it->second;
483      int nb = globalIndexTmp.size();
484
485      for (int i = 0; i < nb; ++i)
486      {
487        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
488        {
489          indSrv_[rank].push_back(globalIndexTmp[i]);
490        }
491
492        if (writtenInd.count(globalIndexTmp[i]))
493        {
494          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
495        }
496      }
497    }
498
499    connectedServerRank_.clear();
500    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
501      connectedServerRank_.push_back(it->first);
502    }
503
504    if (!indSrv_.empty())
505    {
506      connectedServerRank_.clear();
507      for (it = indSrv_.begin(); it != indSrv_.end(); ++it)
508        connectedServerRank_.push_back(it->first);
509    }
510    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
511  }
512
513  void CAxis::sendNonDistributedValue()
514  {
515    CContext* context = CContext::getCurrent();
516    CContextClient* client = context->client;
517    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
518
519    int zoom_end = global_zoom_begin + global_zoom_size - 1;
520    int nb = 0;
521    for (size_t idx = 0; idx < n; ++idx)
522    {
523      size_t globalIndex = begin + idx;
524      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
525    }
526
527    int nbWritten = 0;
528    if (isCompressible_)
529    {
530      for (int idx = 0; idx < data_index.numElements(); ++idx)
531      {
532        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
533
534        if (ind >= 0 && ind < n && mask(ind))
535        {
536          ind += begin;
537          if (ind >= global_zoom_begin && ind <= zoom_end)
538            ++nbWritten;
539        }
540      }
541    }
542
543    CArray<double,1> val(nb);
544    nb = 0;
545    for (size_t idx = 0; idx < n; ++idx)
546    {
547      size_t globalIndex = begin + idx;
548      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
549      {
550        val(nb) = value(idx);
551        ++nb;
552      }
553    }
554
555    CArray<int, 1> writtenInd(nbWritten);
556    nbWritten = 0;
557    if (isCompressible_)
558    {
559      for (int idx = 0; idx < data_index.numElements(); ++idx)
560      {
561        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
562
563        if (ind >= 0 && ind < n && mask(ind))
564        {
565          ind += begin;
566          if (ind >= global_zoom_begin && ind <= zoom_end)
567          {
568            writtenInd(nbWritten) = ind;
569            ++nbWritten;
570          }
571        }
572      }
573    }
574
575    if (client->isServerLeader())
576    {
577      std::list<CMessage> msgs;
578
579      const std::list<int>& ranks = client->getRanksServerLeader();
580      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
581      {
582        msgs.push_back(CMessage());
583        CMessage& msg = msgs.back();
584        msg << this->getId();
585        msg << val;
586        if (isCompressible_)
587          msg << writtenInd;
588        event.push(*itRank, 1, msg);
589      }
590      client->sendEvent(event);
591    }
592    else client->sendEvent(event);
593  }
594
595  void CAxis::sendDistributedValue(void)
596  {
597    int ns, n, i, j, ind, nv, idx;
598    CContext* context = CContext::getCurrent();
599    CContextClient* client=context->client;
600
601    // send value for each connected server
602    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
603    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
604
605    list<CMessage> list_msgsIndex, list_msgsVal;
606    list<CArray<int,1> > list_indi;
607    list<CArray<int,1> > list_writtenInd;
608    list<CArray<double,1> > list_val;
609    list<CArray<double,2> > list_bounds;
610
611    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
612    iteMap = indSrv_.end();
613    for (int k = 0; k < connectedServerRank_.size(); ++k)
614    {
615      int nbData = 0;
616      int rank = connectedServerRank_[k];
617      it = indSrv_.find(rank);
618      if (iteMap != it)
619        nbData = it->second.size();
620
621      list_indi.push_back(CArray<int,1>(nbData));
622      list_val.push_back(CArray<double,1>(nbData));
623
624      if (hasBounds_)
625      {
626        list_bounds.push_back(CArray<double,2>(2,nbData));
627      }
628
629      CArray<int,1>& indi = list_indi.back();
630      CArray<double,1>& val = list_val.back();
631
632      for (n = 0; n < nbData; ++n)
633      {
634        idx = static_cast<int>(it->second[n]);
635        ind = idx - begin;
636
637        val(n) = value(ind);
638        indi(n) = idx;
639
640        if (hasBounds_)
641        {
642          CArray<double,2>& boundsVal = list_bounds.back();
643          boundsVal(0, n) = bounds(0,n);
644          boundsVal(1, n) = bounds(1,n);
645        }
646      }
647
648      list_msgsIndex.push_back(CMessage());
649      list_msgsIndex.back() << this->getId() << list_indi.back();
650
651      if (isCompressible_)
652      {
653        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
654        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
655        CArray<int,1>& writtenInd = list_writtenInd.back();
656
657        for (n = 0; n < writtenInd.numElements(); ++n)
658          writtenInd(n) = writtenIndSrc[n];
659
660        list_msgsIndex.back() << writtenInd;
661      }
662
663      list_msgsVal.push_back(CMessage());
664      list_msgsVal.back() << this->getId() << list_val.back();
665
666      if (hasBounds_)
667      {
668        list_msgsVal.back() << list_bounds.back();
669      }
670
671      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
672      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
673    }
674
675    client->sendEvent(eventIndex);
676    client->sendEvent(eventVal);
677  }
678
679  void CAxis::recvIndex(CEventServer& event)
680  {
681    CAxis* axis;
682
683    list<CEventServer::SSubEvent>::iterator it;
684    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
685    {
686      CBufferIn* buffer = it->buffer;
687      string axisId;
688      *buffer >> axisId;
689      axis = get(axisId);
690      axis->recvIndex(it->rank, *buffer);
691    }
692
693    if (axis->isCompressible_)
694    {
695      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
696
697      CContextServer* server = CContext::getCurrent()->server;
698      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
699      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
700      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
701      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
702    }
703  }
704
705  void CAxis::recvIndex(int rank, CBufferIn& buffer)
706  {
707    buffer >> indiSrv_[rank];
708
709    if (isCompressible_)
710    {
711      CArray<int, 1> writtenIndexes;
712      buffer >> writtenIndexes;
713      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
714      for (int i = 0; i < writtenIndexes.numElements(); ++i)
715        indexesToWrite.push_back(writtenIndexes(i));
716    }
717  }
718
719  void CAxis::recvDistributedValue(CEventServer& event)
720  {
721    list<CEventServer::SSubEvent>::iterator it;
722    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
723    {
724      CBufferIn* buffer = it->buffer;
725      string axisId;
726      *buffer >> axisId;
727      get(axisId)->recvDistributedValue(it->rank, *buffer);
728    }
729  }
730
731  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
732  {
733    CArray<int,1> &indi = indiSrv_[rank];
734    CArray<double,1> val;
735    CArray<double,2> boundsVal;
736
737    buffer >> val;
738    if (hasBounds_) buffer >> boundsVal;
739
740    int i, j, ind_srv;
741    for (int ind = 0; ind < indi.numElements(); ++ind)
742    {
743      i = indi(ind);
744      ind_srv = i - zoom_begin_srv;
745      value_srv(ind_srv) = val(ind);
746      if (hasBounds_)
747      {
748        bound_srv(0,ind_srv) = boundsVal(0, ind);
749        bound_srv(1,ind_srv) = boundsVal(1, ind);
750      }
751    }
752  }
753
754   void CAxis::recvNonDistributedValue(CEventServer& event)
755  {
756    CAxis* axis;
757
758    list<CEventServer::SSubEvent>::iterator it;
759    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
760    {
761      CBufferIn* buffer = it->buffer;
762      string axisId;
763      *buffer >> axisId;
764      axis = get(axisId);
765      axis->recvNonDistributedValue(it->rank, *buffer);
766    }
767
768    if (axis->isCompressible_)
769    {
770      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
771
772      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
773      axis->offsetWrittenIndexes_ = 0;
774    }
775  }
776
777  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
778  {
779    CArray<double,1> val;
780    buffer >> val;
781
782    for (int ind = 0; ind < val.numElements(); ++ind)
783    {
784      value_srv(ind) = val(ind);
785      if (hasBounds_)
786      {
787        bound_srv(0,ind) = bounds(0,ind);
788        bound_srv(1,ind) = bounds(1,ind);
789      }
790    }
791
792    if (isCompressible_)
793    {
794      CArray<int, 1> writtenIndexes;
795      buffer >> writtenIndexes;
796      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
797      for (int i = 0; i < writtenIndexes.numElements(); ++i)
798        indexesToWrite.push_back(writtenIndexes(i));
799    }
800  }
801
802  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
803                                 CServerDistributionDescription::ServerDistributionType distType)
804  {
805    CContext* context = CContext::getCurrent();
806    CContextClient* client = context->client;
807    int nbServer = client->serverSize;
808
809    CServerDistributionDescription serverDescription(globalDim, nbServer);
810    serverDescription.computeServerDistribution();
811
812    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
813    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
814
815    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
816    if (client->isServerLeader())
817    {
818      std::list<CMessage> msgs;
819
820      const std::list<int>& ranks = client->getRanksServerLeader();
821      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
822      {
823        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
824        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
825        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
826        const int end   = begin + ni - 1;
827
828        msgs.push_back(CMessage());
829        CMessage& msg = msgs.back();
830        msg << this->getId();
831        msg << ni << begin << end;
832        msg << global_zoom_begin << global_zoom_size;
833        msg << isCompressible_;
834
835        event.push(*itRank,1,msg);
836      }
837      client->sendEvent(event);
838    }
839    else client->sendEvent(event);
840  }
841
842  void CAxis::recvServerAttribut(CEventServer& event)
843  {
844    CBufferIn* buffer = event.subEvents.begin()->buffer;
845    string axisId;
846    *buffer >> axisId;
847    get(axisId)->recvServerAttribut(*buffer);
848  }
849
850  void CAxis::recvServerAttribut(CBufferIn& buffer)
851  {
852    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_size_tmp;
853
854    buffer >> ni_srv >> begin_srv >> end_srv;
855    buffer >> global_zoom_begin_tmp >> global_zoom_size_tmp;
856    buffer >> isCompressible_;
857    global_zoom_begin = global_zoom_begin_tmp;
858    global_zoom_size  = global_zoom_size_tmp;
859    int global_zoom_end = global_zoom_begin + global_zoom_size - 1;
860
861    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
862    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
863    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
864
865    if (zoom_size_srv<=0)
866    {
867      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
868    }
869
870    if (n_glo == n)
871    {
872      zoom_begin_srv = global_zoom_begin;
873      zoom_end_srv   = global_zoom_end; //zoom_end;
874      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
875    }
876    if (hasValue)
877    {
878      value_srv.resize(zoom_size_srv);
879      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
880    }
881  }
882
883  bool CAxis::hasTransformation()
884  {
885    return (!transformationMap_.empty());
886  }
887
888  void CAxis::setTransformations(const TransMapTypes& axisTrans)
889  {
890    transformationMap_ = axisTrans;
891  }
892
893  CAxis::TransMapTypes CAxis::getAllTransformations(void)
894  {
895    return transformationMap_;
896  }
897
898  /*!
899    Check the validity of all transformations applied on axis
900  This functions is called AFTER all inherited attributes are solved
901  */
902  void CAxis::checkTransformations()
903  {
904    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
905                                  ite = transformationMap_.end();
906    for (it = itb; it != ite; ++it)
907    {
908      (it->second)->checkValid(this);
909    }
910  }
911
912  /*!
913   * Go through the hierarchy to find the axis from which the transformations must be inherited
914   */
915  void CAxis::solveInheritanceTransformation()
916  {
917    if (hasTransformation() || !hasDirectAxisReference())
918      return;
919
920    CAxis* axis = this;
921    std::vector<CAxis*> refAxis;
922    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
923    {
924      refAxis.push_back(axis);
925      axis = axis->getDirectAxisReference();
926    }
927
928    if (axis->hasTransformation())
929      for (size_t i = 0; i < refAxis.size(); ++i)
930        refAxis[i]->setTransformations(axis->getAllTransformations());
931  }
932
933  void CAxis::parse(xml::CXMLNode & node)
934  {
935    SuperClass::parse(node);
936
937    if (node.goToChildElement())
938    {
939      StdString inverseAxisDefRoot("inverse_axis_definition");
940      StdString inverse("inverse_axis");
941      StdString zoomAxisDefRoot("zoom_axis_definition");
942      StdString zoom("zoom_axis");
943      StdString interpAxisDefRoot("interpolate_axis_definition");
944      StdString interp("interpolate_axis");
945      do
946      {
947        StdString nodeId("");
948        if (node.getAttributes().end() != node.getAttributes().find("id"))
949        { nodeId = node.getAttributes()["id"]; }
950
951        if (node.getElementName() == inverse) {
952          CInverseAxis* tmp = (CInverseAxisGroup::get(inverseAxisDefRoot))->createChild(nodeId);
953          tmp->parse(node);
954          transformationMap_.push_back(std::make_pair(TRANS_INVERSE_AXIS,tmp));
955        }
956        else if (node.getElementName() == zoom) {
957          CZoomAxis* tmp = (CZoomAxisGroup::get(zoomAxisDefRoot))->createChild(nodeId);
958          tmp->parse(node);
959          transformationMap_.push_back(std::make_pair(TRANS_ZOOM_AXIS,tmp));
960        }
961        else if (node.getElementName() == interp) {
962          CInterpolateAxis* tmp = (CInterpolateAxisGroup::get(interpAxisDefRoot))->createChild(nodeId);
963          tmp->parse(node);
964          transformationMap_.push_back(std::make_pair(TRANS_INTERPOLATE_AXIS,tmp));
965        }
966      } while (node.goToNextElement()) ;
967      node.goToParentElement();
968    }
969  }
970
971  DEFINE_REF_FUNC(Axis,axis)
972
973   ///---------------------------------------------------------------
974
975} // namespace xios
Note: See TracBrowser for help on using the repository browser.