source: XIOS/trunk/src/node/axis.cpp @ 829

Last change on this file since 829 was 829, checked in by mhnguyen, 8 years ago

Refactoring transformation code

+) On exchanging information during transformation, not only global index are sent but also local index
+) Correct a bug in distributed hash table (dht)
+) Add new type for dht
+) Clean up some redundant codes

Test
+) On Curie
+) Every test passes
+) Code runs faster in some cases (up to 30%)

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 30.7 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
27      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
28      , transformationMap_(), hasValue(false)
29   {
30   }
31
32   CAxis::CAxis(const StdString & id)
33      : CObjectTemplate<CAxis>(id)
34      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
35      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
36      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
37      , transformationMap_(), hasValue(false)
38   {
39   }
40
41   CAxis::~CAxis(void)
42   { /* Ne rien faire de plus */ }
43
44   ///---------------------------------------------------------------
45
46   const std::set<StdString> & CAxis::getRelFiles(void) const
47   {
48      return (this->relFiles);
49   }
50
51   bool CAxis::IsWritten(const StdString & filename) const
52   {
53      return (this->relFiles.find(filename) != this->relFiles.end());
54   }
55
56   bool CAxis::isWrittenCompressed(const StdString& filename) const
57   {
58      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
59   }
60
61   bool CAxis::isDistributed(void) const
62   {
63      return isDistributed_;
64   }
65
66   /*!
67    * Test whether the data defined on the axis can be outputted in a compressed way.
68    *
69    * \return true if and only if a mask was defined for this axis
70    */
71   bool CAxis::isCompressible(void) const
72   {
73      return isCompressible_;
74   }
75
76   void CAxis::addRelFile(const StdString & filename)
77   {
78      this->relFiles.insert(filename);
79   }
80
81   void CAxis::addRelFileCompressed(const StdString& filename)
82   {
83      this->relFilesCompressed.insert(filename);
84   }
85
86   //----------------------------------------------------------------
87
88   const std::vector<int>& CAxis::getIndexesToWrite(void) const
89   {
90     return indexesToWrite;
91   }
92
93   /*!
94     Returns the number of indexes written by each server.
95     \return the number of indexes written by each server
96   */
97   int CAxis::getNumberWrittenIndexes() const
98   {
99     return numberWrittenIndexes_;
100   }
101
102   /*!
103     Returns the total number of indexes written by the servers.
104     \return the total number of indexes written by the servers
105   */
106   int CAxis::getTotalNumberWrittenIndexes() const
107   {
108     return totalNumberWrittenIndexes_;
109   }
110
111   /*!
112     Returns the offset of indexes written by each server.
113     \return the offset of indexes written by each server
114   */
115   int CAxis::getOffsetWrittenIndexes() const
116   {
117     return offsetWrittenIndexes_;
118   }
119
120   //----------------------------------------------------------------
121
122   /*!
123    * Compute the minimum buffer size required to send the attributes to the server(s).
124    *
125    * \return A map associating the server rank with its minimum buffer size.
126    */
127   std::map<int, StdSize> CAxis::getAttributesBufferSize()
128   {
129     CContextClient* client = CContext::getCurrent()->client;
130
131     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
132
133     bool isNonDistributed = (n == n_glo);
134
135     if (client->isServerLeader())
136     {
137       // size estimation for sendServerAttribut
138       size_t size = 6 * sizeof(size_t);
139       // size estimation for sendNonDistributedValue
140       if (isNonDistributed)
141         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
142       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
143
144       const std::list<int>& ranks = client->getRanksServerLeader();
145       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
146       {
147         if (size > attributesSizes[*itRank])
148           attributesSizes[*itRank] = size;
149       }
150     }
151
152     if (!isNonDistributed)
153     {
154       // size estimation for sendDistributedValue
155       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
156       for (it = indSrv_.begin(); it != ite; ++it)
157       {
158         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
159         if (isCompressible_)
160           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
161
162         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
163         if (hasBounds_)
164           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
165
166         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
167         if (size > attributesSizes[it->first])
168           attributesSizes[it->first] = size;
169       }
170     }
171
172     return attributesSizes;
173   }
174
175   //----------------------------------------------------------------
176
177   StdString CAxis::GetName(void)   { return (StdString("axis")); }
178   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
179   ENodeType CAxis::GetType(void)   { return (eAxis); }
180
181   //----------------------------------------------------------------
182
183   CAxis* CAxis::createAxis()
184   {
185     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
186     return axis;
187   }
188
189   void CAxis::fillInValues(const CArray<double,1>& values)
190   {
191     this->value = values;
192   }
193
194   void CAxis::checkAttributes(void)
195   {
196      if (this->n_glo.isEmpty())
197        ERROR("CAxis::checkAttributes(void)",
198              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
199              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
200      StdSize size = this->n_glo.getValue();
201
202      isDistributed_ = !this->begin.isEmpty() || !this->n.isEmpty();
203
204      if (!this->begin.isEmpty())
205      {
206        if (begin < 0 || begin > size - 1)
207          ERROR("CAxis::checkAttributes(void)",
208                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
209                << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
210      }
211      else this->begin.setValue(0);
212
213      if (!this->n.isEmpty())
214      {
215        if (n < 0 || n > size)
216          ERROR("CAxis::checkAttributes(void)",
217                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
218                << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
219      }
220      else this->n.setValue(size);
221
222      if (!this->value.isEmpty())
223      {
224        StdSize true_size = value.numElements();
225        if (this->n.getValue() != true_size)
226          ERROR("CAxis::checkAttributes(void)",
227                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
228                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
229        this->hasValue = true;
230      }
231
232      if (this->index.isEmpty())
233      {
234        index.resize(n);
235        for (int i = 0; i < n; ++i) index(i) = i+begin;
236      }
237
238      this->checkData();
239      this->checkZoom();
240      this->checkMask();
241      this->checkBounds();
242   }
243
244   void CAxis::checkData()
245   {
246      if (data_begin.isEmpty()) data_begin.setValue(0);
247
248      if (data_n.isEmpty())
249      {
250        data_n.setValue(n);
251      }
252      else if (data_n.getValue() < 0)
253      {
254        ERROR("CAxis::checkData(void)",
255              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
256              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
257      }
258
259      if (data_index.isEmpty())
260      {
261        data_index.resize(data_n);
262        for (int i = 0; i < data_n; ++i) data_index(i) = i;
263      }
264   }
265
266   void CAxis::checkZoom(void)
267   {
268     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
269     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
270   }
271
272   void CAxis::checkMask()
273   {
274      if (!mask.isEmpty())
275      {
276         if (mask.extent(0) != n)
277           ERROR("CAxis::checkMask(void)",
278                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
279                 << "The mask does not have the same size as the local domain." << std::endl
280                 << "Local size is " << n.getValue() << "." << std::endl
281                 << "Mask size is " << mask.extent(0) << ".");
282      }
283      else // (mask.isEmpty())
284      { // If no mask was defined, we create a default one without any masked point.
285         mask.resize(n);
286         for (int i = 0; i < n; ++i)
287         {
288           mask(i) = true;
289         }
290      }
291   }
292
293  void CAxis::checkBounds()
294  {
295    if (!bounds.isEmpty())
296    {
297      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
298        ERROR("CAxis::checkAttributes(void)",
299              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
300              << "Axis size is " << n.getValue() << "." << std::endl
301              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
302      hasBounds_ = true;
303    }
304    else hasBounds_ = false;
305  }
306
307  void CAxis::checkEligibilityForCompressedOutput()
308  {
309    // We don't check if the mask is valid here, just if a mask has been defined at this point.
310    isCompressible_ = !mask.isEmpty();
311  }
312
313  bool CAxis::dispatchEvent(CEventServer& event)
314   {
315      if (SuperClass::dispatchEvent(event)) return true;
316      else
317      {
318        switch(event.type)
319        {
320           case EVENT_ID_SERVER_ATTRIBUT :
321             recvServerAttribut(event);
322             return true;
323             break;
324           case EVENT_ID_INDEX:
325            recvIndex(event);
326            return true;
327            break;
328          case EVENT_ID_DISTRIBUTED_VALUE:
329            recvDistributedValue(event);
330            return true;
331            break;
332          case EVENT_ID_NON_DISTRIBUTED_VALUE:
333            recvNonDistributedValue(event);
334            return true;
335            break;
336           default :
337             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
338                    << "Unknown Event");
339           return false;
340         }
341      }
342   }
343
344   void CAxis::checkAttributesOnClient()
345   {
346     if (this->areClientAttributesChecked_) return;
347
348     this->checkAttributes();
349
350     this->areClientAttributesChecked_ = true;
351   }
352
353   // Send all checked attributes to server
354   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
355                                     CServerDistributionDescription::ServerDistributionType distType)
356   {
357     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
358     CContext* context = CContext::getCurrent();
359
360     if (this->isChecked) return;
361     if (context->hasClient)
362     {
363       sendServerAttribut(globalDim, orderPositionInGrid, distType);
364       if (hasValue) sendValue(globalDim, orderPositionInGrid, distType);
365     }
366
367     this->isChecked = true;
368   }
369
370  void CAxis::sendValue(const std::vector<int>& globalDim, int orderPositionInGrid,
371                        CServerDistributionDescription::ServerDistributionType distType)
372  {
373     if (n.getValue() == n_glo.getValue())
374     {
375       sendNonDistributedValue();
376     }
377     else
378     {
379       computeConnectedServer(globalDim, orderPositionInGrid, distType);
380       sendDistributedValue();
381     }
382  }
383
384  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
385                                     CServerDistributionDescription::ServerDistributionType distType)
386  {
387    CContext* context = CContext::getCurrent();
388    CContextClient* client = context->client;
389    int nbServer = client->serverSize;
390    int range, clientSize = client->clientSize;
391
392    size_t ni = this->n.getValue();
393    size_t ibegin = this->begin.getValue();
394    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
395    size_t nZoomCount = 0;
396    size_t nbIndex = index.numElements();
397    for (size_t idx = 0; idx < nbIndex; ++idx)
398    {
399      size_t globalIndex = index(idx);
400      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
401    }
402
403    CArray<size_t,1> globalIndexAxis(nbIndex);
404    std::vector<size_t> globalAxisZoom(nZoomCount);
405    nZoomCount = 0;
406    for (size_t idx = 0; idx < nbIndex; ++idx)
407    {
408      size_t globalIndex = index(idx);
409      globalIndexAxis(idx) = globalIndex;
410      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
411      {
412        globalAxisZoom[nZoomCount] = globalIndex;
413        ++nZoomCount;
414      }
415    }
416
417    std::set<int> writtenInd;
418    if (isCompressible_)
419    {
420      for (int idx = 0; idx < data_index.numElements(); ++idx)
421      {
422        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
423
424        if (ind >= 0 && ind < ni && mask(ind))
425        {
426          ind += ibegin;
427          if (ind >= global_zoom_begin && ind <= zoom_end)
428            writtenInd.insert(ind);
429        }
430      }
431    }
432
433    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
434    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
435    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
436    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
437    {
438      std::vector<int> nGlobAxis(1);
439      nGlobAxis[0] = n_glo.getValue();
440
441      size_t globalSizeIndex = 1, indexBegin, indexEnd;
442      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
443      indexBegin = 0;
444      for (int i = 0; i < clientSize; ++i)
445      {
446        range = globalSizeIndex / clientSize;
447        if (i < (globalSizeIndex%clientSize)) ++range;
448        if (i == client->clientRank) break;
449        indexBegin += range;
450      }
451      indexEnd = indexBegin + range - 1;
452
453      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
454      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
455      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
456      clientServerMap->computeServerIndexMapping(globalIndexAxis);
457      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
458      delete clientServerMap;
459    }
460    else
461    {
462      std::vector<size_t> globalIndexServer(n_glo.getValue());
463      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
464      {
465        globalIndexServer[idx] = idx;
466      }
467
468      for (int idx = 0; idx < nbServer; ++idx)
469      {
470        globalIndexAxisOnServer[idx] = globalIndexServer;
471      }
472    }
473
474    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
475                                                         ite = globalIndexAxisOnServer.end();
476    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
477                                        iteVec = (globalAxisZoom).end();
478    indSrv_.clear();
479    indWrittenSrv_.clear();
480    for (; it != ite; ++it)
481    {
482      int rank = it->first;
483      const std::vector<size_t>& globalIndexTmp = it->second;
484      int nb = globalIndexTmp.size();
485
486      for (int i = 0; i < nb; ++i)
487      {
488        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
489        {
490          indSrv_[rank].push_back(globalIndexTmp[i]);
491        }
492
493        if (writtenInd.count(globalIndexTmp[i]))
494        {
495          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
496        }
497      }
498    }
499
500    connectedServerRank_.clear();
501    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
502      connectedServerRank_.push_back(it->first);
503    }
504
505    if (!indSrv_.empty())
506    {
507      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
508                                                     iteIndSrv = indSrv_.end();
509      connectedServerRank_.clear();
510      for (; itIndSrv != iteIndSrv; ++itIndSrv)
511        connectedServerRank_.push_back(itIndSrv->first);
512    }
513    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
514  }
515
516  void CAxis::sendNonDistributedValue()
517  {
518    CContext* context = CContext::getCurrent();
519    CContextClient* client = context->client;
520    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
521
522    int zoom_end = global_zoom_begin + global_zoom_n - 1;
523    int nb = 0;
524    for (size_t idx = 0; idx < n; ++idx)
525    {
526      size_t globalIndex = begin + idx;
527      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
528    }
529
530    int nbWritten = 0;
531    if (isCompressible_)
532    {
533      for (int idx = 0; idx < data_index.numElements(); ++idx)
534      {
535        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
536
537        if (ind >= 0 && ind < n && mask(ind))
538        {
539          ind += begin;
540          if (ind >= global_zoom_begin && ind <= zoom_end)
541            ++nbWritten;
542        }
543      }
544    }
545
546    CArray<double,1> val(nb);
547    nb = 0;
548    for (size_t idx = 0; idx < n; ++idx)
549    {
550      size_t globalIndex = begin + idx;
551      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
552      {
553        val(nb) = value(idx);
554        ++nb;
555      }
556    }
557
558    CArray<int, 1> writtenInd(nbWritten);
559    nbWritten = 0;
560    if (isCompressible_)
561    {
562      for (int idx = 0; idx < data_index.numElements(); ++idx)
563      {
564        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
565
566        if (ind >= 0 && ind < n && mask(ind))
567        {
568          ind += begin;
569          if (ind >= global_zoom_begin && ind <= zoom_end)
570          {
571            writtenInd(nbWritten) = ind;
572            ++nbWritten;
573          }
574        }
575      }
576    }
577
578    if (client->isServerLeader())
579    {
580      std::list<CMessage> msgs;
581
582      const std::list<int>& ranks = client->getRanksServerLeader();
583      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
584      {
585        msgs.push_back(CMessage());
586        CMessage& msg = msgs.back();
587        msg << this->getId();
588        msg << val;
589        if (isCompressible_)
590          msg << writtenInd;
591        event.push(*itRank, 1, msg);
592      }
593      client->sendEvent(event);
594    }
595    else client->sendEvent(event);
596  }
597
598  void CAxis::sendDistributedValue(void)
599  {
600    int ns, n, i, j, ind, nv, idx;
601    CContext* context = CContext::getCurrent();
602    CContextClient* client=context->client;
603
604    // send value for each connected server
605    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
606    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
607
608    list<CMessage> list_msgsIndex, list_msgsVal;
609    list<CArray<int,1> > list_indi;
610    list<CArray<int,1> > list_writtenInd;
611    list<CArray<double,1> > list_val;
612    list<CArray<double,2> > list_bounds;
613
614    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
615    iteMap = indSrv_.end();
616    for (int k = 0; k < connectedServerRank_.size(); ++k)
617    {
618      int nbData = 0;
619      int rank = connectedServerRank_[k];
620      it = indSrv_.find(rank);
621      if (iteMap != it)
622        nbData = it->second.size();
623
624      list_indi.push_back(CArray<int,1>(nbData));
625      list_val.push_back(CArray<double,1>(nbData));
626
627      if (hasBounds_)
628      {
629        list_bounds.push_back(CArray<double,2>(2,nbData));
630      }
631
632      CArray<int,1>& indi = list_indi.back();
633      CArray<double,1>& val = list_val.back();
634
635      for (n = 0; n < nbData; ++n)
636      {
637        idx = static_cast<int>(it->second[n]);
638        ind = idx - begin;
639
640        val(n) = value(ind);
641        indi(n) = idx;
642
643        if (hasBounds_)
644        {
645          CArray<double,2>& boundsVal = list_bounds.back();
646          boundsVal(0, n) = bounds(0,n);
647          boundsVal(1, n) = bounds(1,n);
648        }
649      }
650
651      list_msgsIndex.push_back(CMessage());
652      list_msgsIndex.back() << this->getId() << list_indi.back();
653
654      if (isCompressible_)
655      {
656        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
657        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
658        CArray<int,1>& writtenInd = list_writtenInd.back();
659
660        for (n = 0; n < writtenInd.numElements(); ++n)
661          writtenInd(n) = writtenIndSrc[n];
662
663        list_msgsIndex.back() << writtenInd;
664      }
665
666      list_msgsVal.push_back(CMessage());
667      list_msgsVal.back() << this->getId() << list_val.back();
668
669      if (hasBounds_)
670      {
671        list_msgsVal.back() << list_bounds.back();
672      }
673
674      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
675      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
676    }
677
678    client->sendEvent(eventIndex);
679    client->sendEvent(eventVal);
680  }
681
682  void CAxis::recvIndex(CEventServer& event)
683  {
684    CAxis* axis;
685
686    list<CEventServer::SSubEvent>::iterator it;
687    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
688    {
689      CBufferIn* buffer = it->buffer;
690      string axisId;
691      *buffer >> axisId;
692      axis = get(axisId);
693      axis->recvIndex(it->rank, *buffer);
694    }
695
696    if (axis->isCompressible_)
697    {
698      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
699
700      CContextServer* server = CContext::getCurrent()->server;
701      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
702      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
703      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
704      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
705    }
706  }
707
708  void CAxis::recvIndex(int rank, CBufferIn& buffer)
709  {
710    buffer >> indiSrv_[rank];
711
712    if (isCompressible_)
713    {
714      CArray<int, 1> writtenIndexes;
715      buffer >> writtenIndexes;
716      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
717      for (int i = 0; i < writtenIndexes.numElements(); ++i)
718        indexesToWrite.push_back(writtenIndexes(i));
719    }
720  }
721
722  void CAxis::recvDistributedValue(CEventServer& event)
723  {
724    list<CEventServer::SSubEvent>::iterator it;
725    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
726    {
727      CBufferIn* buffer = it->buffer;
728      string axisId;
729      *buffer >> axisId;
730      get(axisId)->recvDistributedValue(it->rank, *buffer);
731    }
732  }
733
734  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
735  {
736    CArray<int,1> &indi = indiSrv_[rank];
737    CArray<double,1> val;
738    CArray<double,2> boundsVal;
739
740    buffer >> val;
741    if (hasBounds_) buffer >> boundsVal;
742
743    int i, j, ind_srv;
744    for (int ind = 0; ind < indi.numElements(); ++ind)
745    {
746      i = indi(ind);
747      ind_srv = i - zoom_begin_srv;
748      value_srv(ind_srv) = val(ind);
749      if (hasBounds_)
750      {
751        bound_srv(0,ind_srv) = boundsVal(0, ind);
752        bound_srv(1,ind_srv) = boundsVal(1, ind);
753      }
754    }
755  }
756
757   void CAxis::recvNonDistributedValue(CEventServer& event)
758  {
759    CAxis* axis;
760
761    list<CEventServer::SSubEvent>::iterator it;
762    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
763    {
764      CBufferIn* buffer = it->buffer;
765      string axisId;
766      *buffer >> axisId;
767      axis = get(axisId);
768      axis->recvNonDistributedValue(it->rank, *buffer);
769    }
770
771    if (axis->isCompressible_)
772    {
773      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
774
775      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
776      axis->offsetWrittenIndexes_ = 0;
777    }
778  }
779
780  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
781  {
782    CArray<double,1> val;
783    buffer >> val;
784
785    for (int ind = 0; ind < val.numElements(); ++ind)
786    {
787      value_srv(ind) = val(ind);
788      if (hasBounds_)
789      {
790        bound_srv(0,ind) = bounds(0,ind);
791        bound_srv(1,ind) = bounds(1,ind);
792      }
793    }
794
795    if (isCompressible_)
796    {
797      CArray<int, 1> writtenIndexes;
798      buffer >> writtenIndexes;
799      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
800      for (int i = 0; i < writtenIndexes.numElements(); ++i)
801        indexesToWrite.push_back(writtenIndexes(i));
802    }
803  }
804
805  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
806                                 CServerDistributionDescription::ServerDistributionType distType)
807  {
808    CContext* context = CContext::getCurrent();
809    CContextClient* client = context->client;
810    int nbServer = client->serverSize;
811
812    CServerDistributionDescription serverDescription(globalDim, nbServer);
813    serverDescription.computeServerDistribution();
814
815    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
816    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
817
818    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
819    if (client->isServerLeader())
820    {
821      std::list<CMessage> msgs;
822
823      const std::list<int>& ranks = client->getRanksServerLeader();
824      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
825      {
826        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
827        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
828        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
829        const int end   = begin + ni - 1;
830
831        msgs.push_back(CMessage());
832        CMessage& msg = msgs.back();
833        msg << this->getId();
834        msg << ni << begin << end;
835        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
836        msg << isCompressible_;
837
838        event.push(*itRank,1,msg);
839      }
840      client->sendEvent(event);
841    }
842    else client->sendEvent(event);
843  }
844
845  void CAxis::recvServerAttribut(CEventServer& event)
846  {
847    CBufferIn* buffer = event.subEvents.begin()->buffer;
848    string axisId;
849    *buffer >> axisId;
850    get(axisId)->recvServerAttribut(*buffer);
851  }
852
853  void CAxis::recvServerAttribut(CBufferIn& buffer)
854  {
855    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
856
857    buffer >> ni_srv >> begin_srv >> end_srv;
858    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
859    buffer >> isCompressible_;
860    global_zoom_begin = global_zoom_begin_tmp;
861    global_zoom_n  = global_zoom_n_tmp;
862    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
863
864    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
865    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
866    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
867
868    if (zoom_size_srv<=0)
869    {
870      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
871    }
872
873    if (n_glo == n)
874    {
875      zoom_begin_srv = global_zoom_begin;
876      zoom_end_srv   = global_zoom_end; //zoom_end;
877      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
878    }
879    if (hasValue)
880    {
881      value_srv.resize(zoom_size_srv);
882      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
883    }
884  }
885
886  bool CAxis::hasTransformation()
887  {
888    return (!transformationMap_.empty());
889  }
890
891  void CAxis::setTransformations(const TransMapTypes& axisTrans)
892  {
893    transformationMap_ = axisTrans;
894  }
895
896  CAxis::TransMapTypes CAxis::getAllTransformations(void)
897  {
898    return transformationMap_;
899  }
900
901  /*!
902    Check the validity of all transformations applied on axis
903  This functions is called AFTER all inherited attributes are solved
904  */
905  void CAxis::checkTransformations()
906  {
907    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
908                                  ite = transformationMap_.end();
909    for (it = itb; it != ite; ++it)
910    {
911      (it->second)->checkValid(this);
912    }
913  }
914
915  void CAxis::duplicateTransformation(CAxis* src)
916  {
917    if (src->hasTransformation())
918    {
919      this->setTransformations(src->getAllTransformations());
920    }
921  }
922
923  /*!
924   * Go through the hierarchy to find the axis from which the transformations must be inherited
925   */
926  void CAxis::solveInheritanceTransformation()
927  {
928    if (hasTransformation() || !hasDirectAxisReference())
929      return;
930
931    CAxis* axis = this;
932    std::vector<CAxis*> refAxis;
933    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
934    {
935      refAxis.push_back(axis);
936      axis = axis->getDirectAxisReference();
937    }
938
939    if (axis->hasTransformation())
940      for (size_t i = 0; i < refAxis.size(); ++i)
941        refAxis[i]->setTransformations(axis->getAllTransformations());
942  }
943
944  void CAxis::parse(xml::CXMLNode & node)
945  {
946    SuperClass::parse(node);
947
948    if (node.goToChildElement())
949    {
950      StdString inverseAxisDefRoot("inverse_axis_definition");
951      StdString inverse("inverse_axis");
952      StdString zoomAxisDefRoot("zoom_axis_definition");
953      StdString zoom("zoom_axis");
954      StdString interpAxisDefRoot("interpolate_axis_definition");
955      StdString interp("interpolate_axis");
956      do
957      {
958        StdString nodeId("");
959        if (node.getAttributes().end() != node.getAttributes().find("id"))
960        { nodeId = node.getAttributes()["id"]; }
961
962        if (node.getElementName() == inverse) {
963          CInverseAxis* tmp = (CInverseAxisGroup::get(inverseAxisDefRoot))->createChild(nodeId);
964          tmp->parse(node);
965          transformationMap_.push_back(std::make_pair(TRANS_INVERSE_AXIS,tmp));
966        }
967        else if (node.getElementName() == zoom) {
968          CZoomAxis* tmp = (CZoomAxisGroup::get(zoomAxisDefRoot))->createChild(nodeId);
969          tmp->parse(node);
970          transformationMap_.push_back(std::make_pair(TRANS_ZOOM_AXIS,tmp));
971        }
972        else if (node.getElementName() == interp) {
973          CInterpolateAxis* tmp = (CInterpolateAxisGroup::get(interpAxisDefRoot))->createChild(nodeId);
974          tmp->parse(node);
975          transformationMap_.push_back(std::make_pair(TRANS_INTERPOLATE_AXIS,tmp));
976        }
977      } while (node.goToNextElement()) ;
978      node.goToParentElement();
979    }
980  }
981
982  DEFINE_REF_FUNC(Axis,axis)
983
984   ///---------------------------------------------------------------
985
986} // namespace xios
Note: See TracBrowser for help on using the repository browser.