source: XIOS/trunk/src/node/axis.cpp @ 935

Last change on this file since 935 was 927, checked in by rlacroix, 8 years ago

Fix: Ensure that the buffer sizes needed to send the axis attributes are evaluated before actually sending any data.

This should avoid unexpected buffer requests (which would not cause any real issue most of the times since XIOS tries to recover from those).

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 31.8 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
47   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
48   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
49   {
50     m["zoom_axis"] = TRANS_ZOOM_AXIS;
51     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
52     m["inverse_axis"] = TRANS_INVERSE_AXIS;
53     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
54     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
55   }
56
57   ///---------------------------------------------------------------
58
59   const std::set<StdString> & CAxis::getRelFiles(void) const
60   {
61      return (this->relFiles);
62   }
63
64   bool CAxis::IsWritten(const StdString & filename) const
65   {
66      return (this->relFiles.find(filename) != this->relFiles.end());
67   }
68
69   bool CAxis::isWrittenCompressed(const StdString& filename) const
70   {
71      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
72   }
73
74   bool CAxis::isDistributed(void) const
75   {
76      return isDistributed_;
77   }
78
79   /*!
80    * Test whether the data defined on the axis can be outputted in a compressed way.
81    *
82    * \return true if and only if a mask was defined for this axis
83    */
84   bool CAxis::isCompressible(void) const
85   {
86      return isCompressible_;
87   }
88
89   void CAxis::addRelFile(const StdString & filename)
90   {
91      this->relFiles.insert(filename);
92   }
93
94   void CAxis::addRelFileCompressed(const StdString& filename)
95   {
96      this->relFilesCompressed.insert(filename);
97   }
98
99   //----------------------------------------------------------------
100
101   const std::vector<int>& CAxis::getIndexesToWrite(void) const
102   {
103     return indexesToWrite;
104   }
105
106   /*!
107     Returns the number of indexes written by each server.
108     \return the number of indexes written by each server
109   */
110   int CAxis::getNumberWrittenIndexes() const
111   {
112     return numberWrittenIndexes_;
113   }
114
115   /*!
116     Returns the total number of indexes written by the servers.
117     \return the total number of indexes written by the servers
118   */
119   int CAxis::getTotalNumberWrittenIndexes() const
120   {
121     return totalNumberWrittenIndexes_;
122   }
123
124   /*!
125     Returns the offset of indexes written by each server.
126     \return the offset of indexes written by each server
127   */
128   int CAxis::getOffsetWrittenIndexes() const
129   {
130     return offsetWrittenIndexes_;
131   }
132
133   //----------------------------------------------------------------
134
135   /*!
136    * Compute the minimum buffer size required to send the attributes to the server(s).
137    *
138    * \return A map associating the server rank with its minimum buffer size.
139    */
140   std::map<int, StdSize> CAxis::getAttributesBufferSize()
141   {
142     CContextClient* client = CContext::getCurrent()->client;
143
144     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
145
146     bool isNonDistributed = (n == n_glo);
147
148     if (client->isServerLeader())
149     {
150       // size estimation for sendServerAttribut
151       size_t size = 6 * sizeof(size_t);
152       // size estimation for sendNonDistributedValue
153       if (isNonDistributed)
154         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
155       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
156
157       const std::list<int>& ranks = client->getRanksServerLeader();
158       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
159       {
160         if (size > attributesSizes[*itRank])
161           attributesSizes[*itRank] = size;
162       }
163     }
164
165     if (!isNonDistributed)
166     {
167       // size estimation for sendDistributedValue
168       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
169       for (it = indSrv_.begin(); it != ite; ++it)
170       {
171         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
172         if (isCompressible_)
173           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
174
175         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
176         if (hasBounds_)
177           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
178
179         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
180         if (size > attributesSizes[it->first])
181           attributesSizes[it->first] = size;
182       }
183     }
184
185     return attributesSizes;
186   }
187
188   //----------------------------------------------------------------
189
190   StdString CAxis::GetName(void)   { return (StdString("axis")); }
191   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
192   ENodeType CAxis::GetType(void)   { return (eAxis); }
193
194   //----------------------------------------------------------------
195
196   CAxis* CAxis::createAxis()
197   {
198     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
199     return axis;
200   }
201
202   void CAxis::fillInValues(const CArray<double,1>& values)
203   {
204     this->value = values;
205   }
206
207   void CAxis::checkAttributes(void)
208   {
209      if (this->n_glo.isEmpty())
210        ERROR("CAxis::checkAttributes(void)",
211              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
212              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
213      StdSize size = this->n_glo.getValue();
214
215      isDistributed_ = !this->begin.isEmpty() || !this->n.isEmpty();
216
217      if (!this->begin.isEmpty())
218      {
219        if (begin < 0 || begin > size - 1)
220          ERROR("CAxis::checkAttributes(void)",
221                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
222                << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
223      }
224      else this->begin.setValue(0);
225
226      if (!this->n.isEmpty())
227      {
228        if (n < 0 || n > size)
229          ERROR("CAxis::checkAttributes(void)",
230                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
231                << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
232      }
233      else this->n.setValue(size);
234
235      if (!this->value.isEmpty())
236      {
237        StdSize true_size = value.numElements();
238        if (this->n.getValue() != true_size)
239          ERROR("CAxis::checkAttributes(void)",
240                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
241                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
242        this->hasValue = true;
243      }
244
245      if (this->index.isEmpty())
246      {
247        index.resize(n);
248        for (int i = 0; i < n; ++i) index(i) = i+begin;
249      }
250
251      this->checkData();
252      this->checkZoom();
253      this->checkMask();
254      this->checkBounds();
255   }
256
257   void CAxis::checkData()
258   {
259      if (data_begin.isEmpty()) data_begin.setValue(0);
260
261      if (data_n.isEmpty())
262      {
263        data_n.setValue(n);
264      }
265      else if (data_n.getValue() < 0)
266      {
267        ERROR("CAxis::checkData(void)",
268              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
269              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
270      }
271
272      if (data_index.isEmpty())
273      {
274        data_index.resize(data_n);
275        for (int i = 0; i < data_n; ++i) data_index(i) = i;
276      }
277   }
278
279   void CAxis::checkZoom(void)
280   {
281     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
282     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
283   }
284
285   void CAxis::checkMask()
286   {
287      if (!mask.isEmpty())
288      {
289         if (mask.extent(0) != n)
290           ERROR("CAxis::checkMask(void)",
291                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
292                 << "The mask does not have the same size as the local domain." << std::endl
293                 << "Local size is " << n.getValue() << "." << std::endl
294                 << "Mask size is " << mask.extent(0) << ".");
295      }
296      else // (mask.isEmpty())
297      { // If no mask was defined, we create a default one without any masked point.
298         mask.resize(n);
299         for (int i = 0; i < n; ++i)
300         {
301           mask(i) = true;
302         }
303      }
304   }
305
306  void CAxis::checkBounds()
307  {
308    if (!bounds.isEmpty())
309    {
310      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
311        ERROR("CAxis::checkAttributes(void)",
312              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
313              << "Axis size is " << n.getValue() << "." << std::endl
314              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
315      hasBounds_ = true;
316    }
317    else hasBounds_ = false;
318  }
319
320  void CAxis::checkEligibilityForCompressedOutput()
321  {
322    // We don't check if the mask is valid here, just if a mask has been defined at this point.
323    isCompressible_ = !mask.isEmpty();
324  }
325
326  bool CAxis::dispatchEvent(CEventServer& event)
327   {
328      if (SuperClass::dispatchEvent(event)) return true;
329      else
330      {
331        switch(event.type)
332        {
333           case EVENT_ID_SERVER_ATTRIBUT :
334             recvServerAttribut(event);
335             return true;
336             break;
337           case EVENT_ID_INDEX:
338            recvIndex(event);
339            return true;
340            break;
341          case EVENT_ID_DISTRIBUTED_VALUE:
342            recvDistributedValue(event);
343            return true;
344            break;
345          case EVENT_ID_NON_DISTRIBUTED_VALUE:
346            recvNonDistributedValue(event);
347            return true;
348            break;
349           default :
350             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
351                    << "Unknown Event");
352           return false;
353         }
354      }
355   }
356
357   void CAxis::checkAttributesOnClient()
358   {
359     if (this->areClientAttributesChecked_) return;
360
361     this->checkAttributes();
362
363     this->areClientAttributesChecked_ = true;
364   }
365
366   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
367                                                          CServerDistributionDescription::ServerDistributionType distType)
368   {
369     CContext* context=CContext::getCurrent() ;
370
371     if (this->isClientAfterTransformationChecked) return;
372     if (context->hasClient)
373     {
374       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
375     }
376
377     this->isClientAfterTransformationChecked = true;
378   }
379
380   // Send all checked attributes to server
381   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
382                                     CServerDistributionDescription::ServerDistributionType distType)
383   {
384     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
385     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
386     CContext* context = CContext::getCurrent();
387
388     if (this->isChecked) return;
389     if (context->hasClient)
390     {
391       sendServerAttribut(globalDim, orderPositionInGrid, distType);
392       if (hasValue) sendValue();
393     }
394
395     this->isChecked = true;
396   }
397
398  void CAxis::sendValue()
399  {
400     if (n.getValue() == n_glo.getValue())
401       sendNonDistributedValue();
402     else
403       sendDistributedValue();
404  }
405
406  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
407                                     CServerDistributionDescription::ServerDistributionType distType)
408  {
409    CContext* context = CContext::getCurrent();
410    CContextClient* client = context->client;
411    int nbServer = client->serverSize;
412    int range, clientSize = client->clientSize;
413    int rank = client->clientRank;
414
415    size_t ni = this->n.getValue();
416    size_t ibegin = this->begin.getValue();
417    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
418    size_t nZoomCount = 0;
419    size_t nbIndex = index.numElements();
420    for (size_t idx = 0; idx < nbIndex; ++idx)
421    {
422      size_t globalIndex = index(idx);
423      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
424    }
425
426    CArray<size_t,1> globalIndexAxis(nbIndex);
427    std::vector<size_t> globalAxisZoom(nZoomCount);
428    nZoomCount = 0;
429    for (size_t idx = 0; idx < nbIndex; ++idx)
430    {
431      size_t globalIndex = index(idx);
432      globalIndexAxis(idx) = globalIndex;
433      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
434      {
435        globalAxisZoom[nZoomCount] = globalIndex;
436        ++nZoomCount;
437      }
438    }
439
440    std::set<int> writtenInd;
441    if (isCompressible_)
442    {
443      for (int idx = 0; idx < data_index.numElements(); ++idx)
444      {
445        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
446
447        if (ind >= 0 && ind < ni && mask(ind))
448        {
449          ind += ibegin;
450          if (ind >= global_zoom_begin && ind <= zoom_end)
451            writtenInd.insert(ind);
452        }
453      }
454    }
455
456    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
457    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
458    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
459    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
460    {
461      std::vector<int> nGlobAxis(1);
462      nGlobAxis[0] = n_glo.getValue();
463
464      size_t globalSizeIndex = 1, indexBegin, indexEnd;
465      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
466      indexBegin = 0;
467      if (globalSizeIndex <= clientSize)
468      {
469        indexBegin = rank%globalSizeIndex;
470        indexEnd = indexBegin;
471      }
472      else
473      {
474        for (int i = 0; i < clientSize; ++i)
475        {
476          range = globalSizeIndex / clientSize;
477          if (i < (globalSizeIndex%clientSize)) ++range;
478          if (i == client->clientRank) break;
479          indexBegin += range;
480        }
481        indexEnd = indexBegin + range - 1;
482      }
483
484      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
485      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
486      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
487      clientServerMap->computeServerIndexMapping(globalIndexAxis);
488      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
489      delete clientServerMap;
490    }
491    else
492    {
493      std::vector<size_t> globalIndexServer(n_glo.getValue());
494      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
495      {
496        globalIndexServer[idx] = idx;
497      }
498
499      for (int idx = 0; idx < nbServer; ++idx)
500      {
501        globalIndexAxisOnServer[idx] = globalIndexServer;
502      }
503    }
504
505    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
506                                                         ite = globalIndexAxisOnServer.end();
507    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
508                                        iteVec = (globalAxisZoom).end();
509    indSrv_.clear();
510    indWrittenSrv_.clear();
511    for (; it != ite; ++it)
512    {
513      int rank = it->first;
514      const std::vector<size_t>& globalIndexTmp = it->second;
515      int nb = globalIndexTmp.size();
516
517      for (int i = 0; i < nb; ++i)
518      {
519        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
520        {
521          indSrv_[rank].push_back(globalIndexTmp[i]);
522        }
523
524        if (writtenInd.count(globalIndexTmp[i]))
525        {
526          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
527        }
528      }
529    }
530
531    connectedServerRank_.clear();
532    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
533      connectedServerRank_.push_back(it->first);
534    }
535
536    if (!indSrv_.empty())
537    {
538      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
539                                                     iteIndSrv = indSrv_.end();
540      connectedServerRank_.clear();
541      for (; itIndSrv != iteIndSrv; ++itIndSrv)
542        connectedServerRank_.push_back(itIndSrv->first);
543    }
544    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
545  }
546
547  void CAxis::sendNonDistributedValue()
548  {
549    CContext* context = CContext::getCurrent();
550    CContextClient* client = context->client;
551    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
552
553    int zoom_end = global_zoom_begin + global_zoom_n - 1;
554    int nb = 0;
555    for (size_t idx = 0; idx < n; ++idx)
556    {
557      size_t globalIndex = begin + idx;
558      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
559    }
560
561    int nbWritten = 0;
562    if (isCompressible_)
563    {
564      for (int idx = 0; idx < data_index.numElements(); ++idx)
565      {
566        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
567
568        if (ind >= 0 && ind < n && mask(ind))
569        {
570          ind += begin;
571          if (ind >= global_zoom_begin && ind <= zoom_end)
572            ++nbWritten;
573        }
574      }
575    }
576
577    CArray<double,1> val(nb);
578    nb = 0;
579    for (size_t idx = 0; idx < n; ++idx)
580    {
581      size_t globalIndex = begin + idx;
582      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
583      {
584        val(nb) = value(idx);
585        ++nb;
586      }
587    }
588
589    CArray<int, 1> writtenInd(nbWritten);
590    nbWritten = 0;
591    if (isCompressible_)
592    {
593      for (int idx = 0; idx < data_index.numElements(); ++idx)
594      {
595        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
596
597        if (ind >= 0 && ind < n && mask(ind))
598        {
599          ind += begin;
600          if (ind >= global_zoom_begin && ind <= zoom_end)
601          {
602            writtenInd(nbWritten) = ind;
603            ++nbWritten;
604          }
605        }
606      }
607    }
608
609    if (client->isServerLeader())
610    {
611      std::list<CMessage> msgs;
612
613      const std::list<int>& ranks = client->getRanksServerLeader();
614      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
615      {
616        msgs.push_back(CMessage());
617        CMessage& msg = msgs.back();
618        msg << this->getId();
619        msg << val;
620        if (isCompressible_)
621          msg << writtenInd;
622        event.push(*itRank, 1, msg);
623      }
624      client->sendEvent(event);
625    }
626    else client->sendEvent(event);
627  }
628
629  void CAxis::sendDistributedValue(void)
630  {
631    int ns, n, i, j, ind, nv, idx;
632    CContext* context = CContext::getCurrent();
633    CContextClient* client=context->client;
634
635    // send value for each connected server
636    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
637    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
638
639    list<CMessage> list_msgsIndex, list_msgsVal;
640    list<CArray<int,1> > list_indi;
641    list<CArray<int,1> > list_writtenInd;
642    list<CArray<double,1> > list_val;
643    list<CArray<double,2> > list_bounds;
644
645    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
646    iteMap = indSrv_.end();
647    for (int k = 0; k < connectedServerRank_.size(); ++k)
648    {
649      int nbData = 0;
650      int rank = connectedServerRank_[k];
651      it = indSrv_.find(rank);
652      if (iteMap != it)
653        nbData = it->second.size();
654
655      list_indi.push_back(CArray<int,1>(nbData));
656      list_val.push_back(CArray<double,1>(nbData));
657
658      if (hasBounds_)
659      {
660        list_bounds.push_back(CArray<double,2>(2,nbData));
661      }
662
663      CArray<int,1>& indi = list_indi.back();
664      CArray<double,1>& val = list_val.back();
665
666      for (n = 0; n < nbData; ++n)
667      {
668        idx = static_cast<int>(it->second[n]);
669        ind = idx - begin;
670
671        val(n) = value(ind);
672        indi(n) = idx;
673
674        if (hasBounds_)
675        {
676          CArray<double,2>& boundsVal = list_bounds.back();
677          boundsVal(0, n) = bounds(0,n);
678          boundsVal(1, n) = bounds(1,n);
679        }
680      }
681
682      list_msgsIndex.push_back(CMessage());
683      list_msgsIndex.back() << this->getId() << list_indi.back();
684
685      if (isCompressible_)
686      {
687        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
688        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
689        CArray<int,1>& writtenInd = list_writtenInd.back();
690
691        for (n = 0; n < writtenInd.numElements(); ++n)
692          writtenInd(n) = writtenIndSrc[n];
693
694        list_msgsIndex.back() << writtenInd;
695      }
696
697      list_msgsVal.push_back(CMessage());
698      list_msgsVal.back() << this->getId() << list_val.back();
699
700      if (hasBounds_)
701      {
702        list_msgsVal.back() << list_bounds.back();
703      }
704
705      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
706      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
707    }
708
709    client->sendEvent(eventIndex);
710    client->sendEvent(eventVal);
711  }
712
713  void CAxis::recvIndex(CEventServer& event)
714  {
715    CAxis* axis;
716
717    list<CEventServer::SSubEvent>::iterator it;
718    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
719    {
720      CBufferIn* buffer = it->buffer;
721      string axisId;
722      *buffer >> axisId;
723      axis = get(axisId);
724      axis->recvIndex(it->rank, *buffer);
725    }
726
727    if (axis->isCompressible_)
728    {
729      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
730
731      CContextServer* server = CContext::getCurrent()->server;
732      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
733      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
734      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
735      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
736    }
737  }
738
739  void CAxis::recvIndex(int rank, CBufferIn& buffer)
740  {
741    buffer >> indiSrv_[rank];
742
743    if (isCompressible_)
744    {
745      CArray<int, 1> writtenIndexes;
746      buffer >> writtenIndexes;
747      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
748      for (int i = 0; i < writtenIndexes.numElements(); ++i)
749        indexesToWrite.push_back(writtenIndexes(i));
750    }
751  }
752
753  void CAxis::recvDistributedValue(CEventServer& event)
754  {
755    list<CEventServer::SSubEvent>::iterator it;
756    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
757    {
758      CBufferIn* buffer = it->buffer;
759      string axisId;
760      *buffer >> axisId;
761      get(axisId)->recvDistributedValue(it->rank, *buffer);
762    }
763  }
764
765  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
766  {
767    CArray<int,1> &indi = indiSrv_[rank];
768    CArray<double,1> val;
769    CArray<double,2> boundsVal;
770
771    buffer >> val;
772    if (hasBounds_) buffer >> boundsVal;
773
774    int i, j, ind_srv;
775    for (int ind = 0; ind < indi.numElements(); ++ind)
776    {
777      i = indi(ind);
778      ind_srv = i - zoom_begin_srv;
779      value_srv(ind_srv) = val(ind);
780      if (hasBounds_)
781      {
782        bound_srv(0,ind_srv) = boundsVal(0, ind);
783        bound_srv(1,ind_srv) = boundsVal(1, ind);
784      }
785    }
786  }
787
788   void CAxis::recvNonDistributedValue(CEventServer& event)
789  {
790    CAxis* axis;
791
792    list<CEventServer::SSubEvent>::iterator it;
793    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
794    {
795      CBufferIn* buffer = it->buffer;
796      string axisId;
797      *buffer >> axisId;
798      axis = get(axisId);
799      axis->recvNonDistributedValue(it->rank, *buffer);
800    }
801
802    if (axis->isCompressible_)
803    {
804      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
805
806      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
807      axis->offsetWrittenIndexes_ = 0;
808    }
809  }
810
811  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
812  {
813    CArray<double,1> val;
814    buffer >> val;
815
816    for (int ind = 0; ind < val.numElements(); ++ind)
817    {
818      value_srv(ind) = val(ind);
819      if (hasBounds_)
820      {
821        bound_srv(0,ind) = bounds(0,ind);
822        bound_srv(1,ind) = bounds(1,ind);
823      }
824    }
825
826    if (isCompressible_)
827    {
828      CArray<int, 1> writtenIndexes;
829      buffer >> writtenIndexes;
830      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
831      for (int i = 0; i < writtenIndexes.numElements(); ++i)
832        indexesToWrite.push_back(writtenIndexes(i));
833    }
834  }
835
836  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
837                                 CServerDistributionDescription::ServerDistributionType distType)
838  {
839    CContext* context = CContext::getCurrent();
840    CContextClient* client = context->client;
841    int nbServer = client->serverSize;
842
843    CServerDistributionDescription serverDescription(globalDim, nbServer);
844    serverDescription.computeServerDistribution();
845
846    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
847    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
848
849    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
850    if (client->isServerLeader())
851    {
852      std::list<CMessage> msgs;
853
854      const std::list<int>& ranks = client->getRanksServerLeader();
855      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
856      {
857        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
858        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
859        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
860        const int end   = begin + ni - 1;
861
862        msgs.push_back(CMessage());
863        CMessage& msg = msgs.back();
864        msg << this->getId();
865        msg << ni << begin << end;
866        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
867        msg << isCompressible_;
868
869        event.push(*itRank,1,msg);
870      }
871      client->sendEvent(event);
872    }
873    else client->sendEvent(event);
874  }
875
876  void CAxis::recvServerAttribut(CEventServer& event)
877  {
878    CBufferIn* buffer = event.subEvents.begin()->buffer;
879    string axisId;
880    *buffer >> axisId;
881    get(axisId)->recvServerAttribut(*buffer);
882  }
883
884  void CAxis::recvServerAttribut(CBufferIn& buffer)
885  {
886    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
887
888    buffer >> ni_srv >> begin_srv >> end_srv;
889    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
890    buffer >> isCompressible_;
891    global_zoom_begin = global_zoom_begin_tmp;
892    global_zoom_n  = global_zoom_n_tmp;
893    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
894
895    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
896    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
897    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
898
899    if (zoom_size_srv<=0)
900    {
901      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
902    }
903
904    if (n_glo == n)
905    {
906      zoom_begin_srv = global_zoom_begin;
907      zoom_end_srv   = global_zoom_end; //zoom_end;
908      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
909    }
910    if (hasValue)
911    {
912      value_srv.resize(zoom_size_srv);
913      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
914    }
915  }
916
917  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
918  {
919    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
920    return transformationMap_.back().second;
921  }
922
923  bool CAxis::hasTransformation()
924  {
925    return (!transformationMap_.empty());
926  }
927
928  void CAxis::setTransformations(const TransMapTypes& axisTrans)
929  {
930    transformationMap_ = axisTrans;
931  }
932
933  CAxis::TransMapTypes CAxis::getAllTransformations(void)
934  {
935    return transformationMap_;
936  }
937
938  /*!
939    Check the validity of all transformations applied on axis
940  This functions is called AFTER all inherited attributes are solved
941  */
942  void CAxis::checkTransformations()
943  {
944    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
945                                  ite = transformationMap_.end();
946//    for (it = itb; it != ite; ++it)
947//    {
948//      (it->second)->checkValid(this);
949//    }
950  }
951
952  void CAxis::duplicateTransformation(CAxis* src)
953  {
954    if (src->hasTransformation())
955    {
956      this->setTransformations(src->getAllTransformations());
957    }
958  }
959
960  /*!
961   * Go through the hierarchy to find the axis from which the transformations must be inherited
962   */
963  void CAxis::solveInheritanceTransformation()
964  {
965    if (hasTransformation() || !hasDirectAxisReference())
966      return;
967
968    CAxis* axis = this;
969    std::vector<CAxis*> refAxis;
970    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
971    {
972      refAxis.push_back(axis);
973      axis = axis->getDirectAxisReference();
974    }
975
976    if (axis->hasTransformation())
977      for (size_t i = 0; i < refAxis.size(); ++i)
978        refAxis[i]->setTransformations(axis->getAllTransformations());
979  }
980
981  void CAxis::parse(xml::CXMLNode & node)
982  {
983    SuperClass::parse(node);
984
985    if (node.goToChildElement())
986    {
987      StdString nodeElementName;
988      do
989      {
990        StdString nodeId("");
991        if (node.getAttributes().end() != node.getAttributes().find("id"))
992        { nodeId = node.getAttributes()["id"]; }
993
994        nodeElementName = node.getElementName();
995        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
996        it = transformationMapList_.find(nodeElementName);
997        if (ite != it)
998        {
999          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1000                                                                                                               nodeId,
1001                                                                                                               &node)));
1002        }
1003      } while (node.goToNextElement()) ;
1004      node.goToParentElement();
1005    }
1006  }
1007
1008  DEFINE_REF_FUNC(Axis,axis)
1009
1010   ///---------------------------------------------------------------
1011
1012} // namespace xios
Note: See TracBrowser for help on using the repository browser.