source: XIOS/trunk/src/node/axis.cpp @ 1011

Last change on this file since 1011 was 995, checked in by mhnguyen, 8 years ago

Fixing a problem of indexed grid with only one process

+) Checking condition of non-distribution for extreme case

Test
+) On Curie
+) Work

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 32.7 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
47   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
48   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
49   {
50     m["zoom_axis"] = TRANS_ZOOM_AXIS;
51     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
52     m["inverse_axis"] = TRANS_INVERSE_AXIS;
53     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
54     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
55   }
56
57   ///---------------------------------------------------------------
58
59   const std::set<StdString> & CAxis::getRelFiles(void) const
60   {
61      return (this->relFiles);
62   }
63
64   bool CAxis::IsWritten(const StdString & filename) const
65   {
66      return (this->relFiles.find(filename) != this->relFiles.end());
67   }
68
69   bool CAxis::isWrittenCompressed(const StdString& filename) const
70   {
71      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
72   }
73
74   bool CAxis::isDistributed(void) const
75   {
76      return isDistributed_;
77   }
78
79   /*!
80    * Test whether the data defined on the axis can be outputted in a compressed way.
81    *
82    * \return true if and only if a mask was defined for this axis
83    */
84   bool CAxis::isCompressible(void) const
85   {
86      return isCompressible_;
87   }
88
89   void CAxis::addRelFile(const StdString & filename)
90   {
91      this->relFiles.insert(filename);
92   }
93
94   void CAxis::addRelFileCompressed(const StdString& filename)
95   {
96      this->relFilesCompressed.insert(filename);
97   }
98
99   //----------------------------------------------------------------
100
101   const std::vector<int>& CAxis::getIndexesToWrite(void) const
102   {
103     return indexesToWrite;
104   }
105
106   /*!
107     Returns the number of indexes written by each server.
108     \return the number of indexes written by each server
109   */
110   int CAxis::getNumberWrittenIndexes() const
111   {
112     return numberWrittenIndexes_;
113   }
114
115   /*!
116     Returns the total number of indexes written by the servers.
117     \return the total number of indexes written by the servers
118   */
119   int CAxis::getTotalNumberWrittenIndexes() const
120   {
121     return totalNumberWrittenIndexes_;
122   }
123
124   /*!
125     Returns the offset of indexes written by each server.
126     \return the offset of indexes written by each server
127   */
128   int CAxis::getOffsetWrittenIndexes() const
129   {
130     return offsetWrittenIndexes_;
131   }
132
133   //----------------------------------------------------------------
134
135   /*!
136    * Compute the minimum buffer size required to send the attributes to the server(s).
137    *
138    * \return A map associating the server rank with its minimum buffer size.
139    */
140   std::map<int, StdSize> CAxis::getAttributesBufferSize()
141   {
142     CContextClient* client = CContext::getCurrent()->client;
143
144     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
145
146     bool isNonDistributed = (n == n_glo);
147
148     if (client->isServerLeader())
149     {
150       // size estimation for sendServerAttribut
151       size_t size = 6 * sizeof(size_t);
152       // size estimation for sendNonDistributedValue
153       if (isNonDistributed)
154         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
155       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
156
157       const std::list<int>& ranks = client->getRanksServerLeader();
158       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
159       {
160         if (size > attributesSizes[*itRank])
161           attributesSizes[*itRank] = size;
162       }
163     }
164
165     if (!isNonDistributed)
166     {
167       // size estimation for sendDistributedValue
168       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
169       for (it = indSrv_.begin(); it != ite; ++it)
170       {
171         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
172         if (isCompressible_)
173           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
174
175         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
176         if (hasBounds_)
177           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
178
179         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
180         if (size > attributesSizes[it->first])
181           attributesSizes[it->first] = size;
182       }
183     }
184
185     return attributesSizes;
186   }
187
188   //----------------------------------------------------------------
189
190   StdString CAxis::GetName(void)   { return (StdString("axis")); }
191   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
192   ENodeType CAxis::GetType(void)   { return (eAxis); }
193
194   //----------------------------------------------------------------
195
196   CAxis* CAxis::createAxis()
197   {
198     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
199     return axis;
200   }
201
202   void CAxis::fillInValues(const CArray<double,1>& values)
203   {
204     this->value = values;
205   }
206
207   void CAxis::checkAttributes(void)
208   {
209      if (this->n_glo.isEmpty())
210        ERROR("CAxis::checkAttributes(void)",
211              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
212              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
213      StdSize size = this->n_glo.getValue();
214
215      if (!this->index.isEmpty())
216      {
217        if (n.isEmpty()) n = index.numElements();
218
219        // It's not so correct but if begin is not the first value of index
220        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
221        if (begin.isEmpty()) begin = index(0);         
222      }
223      else 
224      {
225        if (!this->begin.isEmpty())
226        {
227          if (begin < 0 || begin > size - 1)
228            ERROR("CAxis::checkAttributes(void)",
229                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
230                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
231        }
232        else this->begin.setValue(0);
233
234        if (!this->n.isEmpty())
235        {
236          if (n < 0 || n > size)
237            ERROR("CAxis::checkAttributes(void)",
238                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
239                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
240        }
241        else this->n.setValue(size);
242
243        {
244          index.resize(n);
245          for (int i = 0; i < n; ++i) index(i) = i+begin;
246        }
247      }
248
249      if (!this->value.isEmpty())
250      {
251        StdSize true_size = value.numElements();
252        if (this->n.getValue() != true_size)
253          ERROR("CAxis::checkAttributes(void)",
254                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
255                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
256        this->hasValue = true;
257      }
258
259      this->checkData();
260      this->checkZoom();
261      this->checkMask();
262      this->checkBounds();
263
264      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
265                       (!this->n.isEmpty() && (this->n != this->n_glo));
266
267      // A same stupid condition to make sure that if there is only one client, axis
268      // should be considered to be distributed. This should be a temporary solution     
269      isDistributed_ |= (1 == CContext::getCurrent()->client->clientSize);
270   }
271
272   void CAxis::checkData()
273   {
274      if (data_begin.isEmpty()) data_begin.setValue(0);
275
276      if (data_n.isEmpty())
277      {
278        data_n.setValue(n);
279      }
280      else if (data_n.getValue() < 0)
281      {
282        ERROR("CAxis::checkData(void)",
283              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
284              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
285      }
286
287      if (data_index.isEmpty())
288      {
289        data_index.resize(data_n);
290        for (int i = 0; i < data_n; ++i) data_index(i) = i;
291      }
292   }
293
294   void CAxis::checkZoom(void)
295   {
296     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
297     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
298   }
299
300   void CAxis::checkMask()
301   {
302      if (!mask.isEmpty())
303      {
304         if (mask.extent(0) != n)
305           ERROR("CAxis::checkMask(void)",
306                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
307                 << "The mask does not have the same size as the local domain." << std::endl
308                 << "Local size is " << n.getValue() << "." << std::endl
309                 << "Mask size is " << mask.extent(0) << ".");
310      }
311      else // (mask.isEmpty())
312      { // If no mask was defined, we create a default one without any masked point.
313         mask.resize(n);
314         for (int i = 0; i < n; ++i)
315         {
316           mask(i) = true;
317         }
318      }
319   }
320
321  void CAxis::checkBounds()
322  {
323    if (!bounds.isEmpty())
324    {
325      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
326        ERROR("CAxis::checkAttributes(void)",
327              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
328              << "Axis size is " << n.getValue() << "." << std::endl
329              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
330      hasBounds_ = true;
331    }
332    else hasBounds_ = false;
333  }
334
335  void CAxis::checkEligibilityForCompressedOutput()
336  {
337    // We don't check if the mask is valid here, just if a mask has been defined at this point.
338    isCompressible_ = !mask.isEmpty();
339  }
340
341  bool CAxis::dispatchEvent(CEventServer& event)
342   {
343      if (SuperClass::dispatchEvent(event)) return true;
344      else
345      {
346        switch(event.type)
347        {
348           case EVENT_ID_SERVER_ATTRIBUT :
349             recvServerAttribut(event);
350             return true;
351             break;
352           case EVENT_ID_INDEX:
353            recvIndex(event);
354            return true;
355            break;
356          case EVENT_ID_DISTRIBUTED_VALUE:
357            recvDistributedValue(event);
358            return true;
359            break;
360          case EVENT_ID_NON_DISTRIBUTED_VALUE:
361            recvNonDistributedValue(event);
362            return true;
363            break;
364           default :
365             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
366                    << "Unknown Event");
367           return false;
368         }
369      }
370   }
371
372   void CAxis::checkAttributesOnClient()
373   {
374     if (this->areClientAttributesChecked_) return;
375
376     this->checkAttributes();
377
378     this->areClientAttributesChecked_ = true;
379   }
380
381   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
382                                                          CServerDistributionDescription::ServerDistributionType distType)
383   {
384     CContext* context=CContext::getCurrent() ;
385
386     if (this->isClientAfterTransformationChecked) return;
387     if (context->hasClient)
388     {
389       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
390     }
391
392     this->isClientAfterTransformationChecked = true;
393   }
394
395   // Send all checked attributes to server
396   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
397                                     CServerDistributionDescription::ServerDistributionType distType)
398   {
399     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
400     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
401     CContext* context = CContext::getCurrent();
402
403     if (this->isChecked) return;
404     if (context->hasClient)
405     {
406       sendServerAttribut(globalDim, orderPositionInGrid, distType);
407       if (hasValue) sendValue();
408     }
409
410     this->isChecked = true;
411   }
412
413  void CAxis::sendValue()
414  {
415     if (n.getValue() == n_glo.getValue())
416       sendNonDistributedValue();
417     else
418       sendDistributedValue();
419  }
420
421  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
422                                     CServerDistributionDescription::ServerDistributionType distType)
423  {
424    CContext* context = CContext::getCurrent();
425    CContextClient* client = context->client;
426    int nbServer = client->serverSize;
427    int range, clientSize = client->clientSize;
428    int rank = client->clientRank;
429
430    size_t ni = this->n.getValue();
431    size_t ibegin = this->begin.getValue();
432    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
433    size_t nZoomCount = 0;
434    size_t nbIndex = index.numElements();
435    for (size_t idx = 0; idx < nbIndex; ++idx)
436    {
437      size_t globalIndex = index(idx);
438      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
439    }
440
441    CArray<size_t,1> globalIndexAxis(nbIndex);
442    std::vector<size_t> globalAxisZoom(nZoomCount);
443    nZoomCount = 0;
444    for (size_t idx = 0; idx < nbIndex; ++idx)
445    {
446      size_t globalIndex = index(idx);
447      globalIndexAxis(idx) = globalIndex;
448      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
449      {
450        globalAxisZoom[nZoomCount] = globalIndex;
451        ++nZoomCount;
452      }
453    }
454
455    std::set<int> writtenInd;
456    if (isCompressible_)
457    {
458      for (int idx = 0; idx < data_index.numElements(); ++idx)
459      {
460        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
461
462        if (ind >= 0 && ind < ni && mask(ind))
463        {
464          ind += ibegin;
465          if (ind >= global_zoom_begin && ind <= zoom_end)
466            writtenInd.insert(ind);
467        }
468      }
469    }
470
471    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
472    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
473    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
474    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
475    {
476      std::vector<int> nGlobAxis(1);
477      nGlobAxis[0] = n_glo.getValue();
478
479      size_t globalSizeIndex = 1, indexBegin, indexEnd;
480      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
481      indexBegin = 0;
482      if (globalSizeIndex <= clientSize)
483      {
484        indexBegin = rank%globalSizeIndex;
485        indexEnd = indexBegin;
486      }
487      else
488      {
489        for (int i = 0; i < clientSize; ++i)
490        {
491          range = globalSizeIndex / clientSize;
492          if (i < (globalSizeIndex%clientSize)) ++range;
493          if (i == client->clientRank) break;
494          indexBegin += range;
495        }
496        indexEnd = indexBegin + range - 1;
497      }
498
499      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
500      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
501      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
502      clientServerMap->computeServerIndexMapping(globalIndexAxis);
503      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
504      delete clientServerMap;
505    }
506    else
507    {
508      std::vector<size_t> globalIndexServer(n_glo.getValue());
509      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
510      {
511        globalIndexServer[idx] = idx;
512      }
513
514      for (int idx = 0; idx < nbServer; ++idx)
515      {
516        globalIndexAxisOnServer[idx] = globalIndexServer;
517      }
518    }
519
520    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
521                                                         ite = globalIndexAxisOnServer.end();
522    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
523                                        iteVec = (globalAxisZoom).end();
524    indSrv_.clear();
525    indWrittenSrv_.clear();
526    for (; it != ite; ++it)
527    {
528      int rank = it->first;
529      const std::vector<size_t>& globalIndexTmp = it->second;
530      int nb = globalIndexTmp.size();
531
532      for (int i = 0; i < nb; ++i)
533      {
534        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
535        {
536          indSrv_[rank].push_back(globalIndexTmp[i]);
537        }
538
539        if (writtenInd.count(globalIndexTmp[i]))
540        {
541          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
542        }
543      }
544    }
545
546    connectedServerRank_.clear();
547    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
548      connectedServerRank_.push_back(it->first);
549    }
550
551    if (!indSrv_.empty())
552    {
553      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
554                                                     iteIndSrv = indSrv_.end();
555      connectedServerRank_.clear();
556      for (; itIndSrv != iteIndSrv; ++itIndSrv)
557        connectedServerRank_.push_back(itIndSrv->first);
558    }
559    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
560  }
561
562  void CAxis::sendNonDistributedValue()
563  {
564    CContext* context = CContext::getCurrent();
565    CContextClient* client = context->client;
566    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
567
568    int zoom_end = global_zoom_begin + global_zoom_n - 1;
569    int nb = 0;
570    for (size_t idx = 0; idx < n; ++idx)
571    {
572      size_t globalIndex = begin + idx;
573      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
574    }
575
576    int nbWritten = 0;
577    if (isCompressible_)
578    {
579      for (int idx = 0; idx < data_index.numElements(); ++idx)
580      {
581        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
582
583        if (ind >= 0 && ind < n && mask(ind))
584        {
585          ind += begin;
586          if (ind >= global_zoom_begin && ind <= zoom_end)
587            ++nbWritten;
588        }
589      }
590    }
591
592    CArray<double,1> val(nb);
593    nb = 0;
594    for (size_t idx = 0; idx < n; ++idx)
595    {
596      size_t globalIndex = begin + idx;
597      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
598      {
599        val(nb) = value(idx);
600        ++nb;
601      }
602    }
603
604    CArray<int, 1> writtenInd(nbWritten);
605    nbWritten = 0;
606    if (isCompressible_)
607    {
608      for (int idx = 0; idx < data_index.numElements(); ++idx)
609      {
610        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
611
612        if (ind >= 0 && ind < n && mask(ind))
613        {
614          ind += begin;
615          if (ind >= global_zoom_begin && ind <= zoom_end)
616          {
617            writtenInd(nbWritten) = ind;
618            ++nbWritten;
619          }
620        }
621      }
622    }
623
624    if (client->isServerLeader())
625    {
626      std::list<CMessage> msgs;
627
628      const std::list<int>& ranks = client->getRanksServerLeader();
629      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
630      {
631        msgs.push_back(CMessage());
632        CMessage& msg = msgs.back();
633        msg << this->getId();
634        msg << val;
635        if (isCompressible_)
636          msg << writtenInd;
637        event.push(*itRank, 1, msg);
638      }
639      client->sendEvent(event);
640    }
641    else client->sendEvent(event);
642  }
643
644  void CAxis::sendDistributedValue(void)
645  {
646    int ns, n, i, j, ind, nv, idx;
647    CContext* context = CContext::getCurrent();
648    CContextClient* client=context->client;
649
650    // send value for each connected server
651    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
652    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
653
654    list<CMessage> list_msgsIndex, list_msgsVal;
655    list<CArray<int,1> > list_indi;
656    list<CArray<int,1> > list_writtenInd;
657    list<CArray<double,1> > list_val;
658    list<CArray<double,2> > list_bounds;
659
660    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
661    iteMap = indSrv_.end();
662    for (int k = 0; k < connectedServerRank_.size(); ++k)
663    {
664      int nbData = 0;
665      int rank = connectedServerRank_[k];
666      it = indSrv_.find(rank);
667      if (iteMap != it)
668        nbData = it->second.size();
669
670      list_indi.push_back(CArray<int,1>(nbData));
671      list_val.push_back(CArray<double,1>(nbData));
672
673      if (hasBounds_)
674      {
675        list_bounds.push_back(CArray<double,2>(2,nbData));
676      }
677
678      CArray<int,1>& indi = list_indi.back();
679      CArray<double,1>& val = list_val.back();
680
681      for (n = 0; n < nbData; ++n)
682      {
683        idx = static_cast<int>(it->second[n]);
684        ind = idx - begin;
685
686        val(n) = value(ind);
687        indi(n) = idx;
688
689        if (hasBounds_)
690        {
691          CArray<double,2>& boundsVal = list_bounds.back();
692          boundsVal(0, n) = bounds(0,n);
693          boundsVal(1, n) = bounds(1,n);
694        }
695      }
696
697      list_msgsIndex.push_back(CMessage());
698      list_msgsIndex.back() << this->getId() << list_indi.back();
699
700      if (isCompressible_)
701      {
702        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
703        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
704        CArray<int,1>& writtenInd = list_writtenInd.back();
705
706        for (n = 0; n < writtenInd.numElements(); ++n)
707          writtenInd(n) = writtenIndSrc[n];
708
709        list_msgsIndex.back() << writtenInd;
710      }
711
712      list_msgsVal.push_back(CMessage());
713      list_msgsVal.back() << this->getId() << list_val.back();
714
715      if (hasBounds_)
716      {
717        list_msgsVal.back() << list_bounds.back();
718      }
719
720      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
721      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
722    }
723
724    client->sendEvent(eventIndex);
725    client->sendEvent(eventVal);
726  }
727
728  void CAxis::recvIndex(CEventServer& event)
729  {
730    CAxis* axis;
731
732    list<CEventServer::SSubEvent>::iterator it;
733    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
734    {
735      CBufferIn* buffer = it->buffer;
736      string axisId;
737      *buffer >> axisId;
738      axis = get(axisId);
739      axis->recvIndex(it->rank, *buffer);
740    }
741
742    if (axis->isCompressible_)
743    {
744      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
745
746      CContextServer* server = CContext::getCurrent()->server;
747      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
748      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
749      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
750      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
751    }
752  }
753
754  void CAxis::recvIndex(int rank, CBufferIn& buffer)
755  {
756    buffer >> indiSrv_[rank];
757
758    if (isCompressible_)
759    {
760      CArray<int, 1> writtenIndexes;
761      buffer >> writtenIndexes;
762      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
763      for (int i = 0; i < writtenIndexes.numElements(); ++i)
764        indexesToWrite.push_back(writtenIndexes(i));
765    }
766  }
767
768  void CAxis::recvDistributedValue(CEventServer& event)
769  {
770    list<CEventServer::SSubEvent>::iterator it;
771    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
772    {
773      CBufferIn* buffer = it->buffer;
774      string axisId;
775      *buffer >> axisId;
776      get(axisId)->recvDistributedValue(it->rank, *buffer);
777    }
778  }
779
780  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
781  {
782    CArray<int,1> &indi = indiSrv_[rank];
783    CArray<double,1> val;
784    CArray<double,2> boundsVal;
785
786    buffer >> val;
787    if (hasBounds_) buffer >> boundsVal;
788
789    int i, j, ind_srv;
790    for (int ind = 0; ind < indi.numElements(); ++ind)
791    {
792      i = indi(ind);
793      ind_srv = i - zoom_begin_srv;
794      value_srv(ind_srv) = val(ind);
795      if (hasBounds_)
796      {
797        bound_srv(0,ind_srv) = boundsVal(0, ind);
798        bound_srv(1,ind_srv) = boundsVal(1, ind);
799      }
800    }
801  }
802
803   void CAxis::recvNonDistributedValue(CEventServer& event)
804  {
805    CAxis* axis;
806
807    list<CEventServer::SSubEvent>::iterator it;
808    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
809    {
810      CBufferIn* buffer = it->buffer;
811      string axisId;
812      *buffer >> axisId;
813      axis = get(axisId);
814      axis->recvNonDistributedValue(it->rank, *buffer);
815    }
816
817    if (axis->isCompressible_)
818    {
819      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
820
821      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
822      axis->offsetWrittenIndexes_ = 0;
823    }
824  }
825
826  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
827  {
828    CArray<double,1> val;
829    buffer >> val;
830
831    for (int ind = 0; ind < val.numElements(); ++ind)
832    {
833      value_srv(ind) = val(ind);
834      if (hasBounds_)
835      {
836        bound_srv(0,ind) = bounds(0,ind);
837        bound_srv(1,ind) = bounds(1,ind);
838      }
839    }
840
841    if (isCompressible_)
842    {
843      CArray<int, 1> writtenIndexes;
844      buffer >> writtenIndexes;
845      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
846      for (int i = 0; i < writtenIndexes.numElements(); ++i)
847        indexesToWrite.push_back(writtenIndexes(i));
848    }
849  }
850
851  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
852                                 CServerDistributionDescription::ServerDistributionType distType)
853  {
854    CContext* context = CContext::getCurrent();
855    CContextClient* client = context->client;
856    int nbServer = client->serverSize;
857
858    CServerDistributionDescription serverDescription(globalDim, nbServer);
859    serverDescription.computeServerDistribution();
860
861    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
862    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
863
864    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
865    if (client->isServerLeader())
866    {
867      std::list<CMessage> msgs;
868
869      const std::list<int>& ranks = client->getRanksServerLeader();
870      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
871      {
872        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
873        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
874        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
875        const int end   = begin + ni - 1;
876
877        msgs.push_back(CMessage());
878        CMessage& msg = msgs.back();
879        msg << this->getId();
880        msg << ni << begin << end;
881        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
882        msg << isCompressible_;
883
884        event.push(*itRank,1,msg);
885      }
886      client->sendEvent(event);
887    }
888    else client->sendEvent(event);
889  }
890
891  void CAxis::recvServerAttribut(CEventServer& event)
892  {
893    CBufferIn* buffer = event.subEvents.begin()->buffer;
894    string axisId;
895    *buffer >> axisId;
896    get(axisId)->recvServerAttribut(*buffer);
897  }
898
899  void CAxis::recvServerAttribut(CBufferIn& buffer)
900  {
901    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
902
903    buffer >> ni_srv >> begin_srv >> end_srv;
904    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
905    buffer >> isCompressible_;
906    global_zoom_begin = global_zoom_begin_tmp;
907    global_zoom_n  = global_zoom_n_tmp;
908    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
909
910    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
911    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
912    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
913
914    if (zoom_size_srv<=0)
915    {
916      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
917    }
918
919    if (n_glo == n)
920    {
921      zoom_begin_srv = global_zoom_begin;
922      zoom_end_srv   = global_zoom_end; //zoom_end;
923      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
924    }
925    if (hasValue)
926    {
927      value_srv.resize(zoom_size_srv);
928      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
929    }
930  }
931
932  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
933  {
934    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
935    return transformationMap_.back().second;
936  }
937
938  bool CAxis::hasTransformation()
939  {
940    return (!transformationMap_.empty());
941  }
942
943  void CAxis::setTransformations(const TransMapTypes& axisTrans)
944  {
945    transformationMap_ = axisTrans;
946  }
947
948  CAxis::TransMapTypes CAxis::getAllTransformations(void)
949  {
950    return transformationMap_;
951  }
952
953  /*!
954    Check the validity of all transformations applied on axis
955  This functions is called AFTER all inherited attributes are solved
956  */
957  void CAxis::checkTransformations()
958  {
959    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
960                                  ite = transformationMap_.end();
961//    for (it = itb; it != ite; ++it)
962//    {
963//      (it->second)->checkValid(this);
964//    }
965  }
966
967  void CAxis::duplicateTransformation(CAxis* src)
968  {
969    if (src->hasTransformation())
970    {
971      this->setTransformations(src->getAllTransformations());
972    }
973  }
974
975  /*!
976   * Go through the hierarchy to find the axis from which the transformations must be inherited
977   */
978  void CAxis::solveInheritanceTransformation()
979  {
980    if (hasTransformation() || !hasDirectAxisReference())
981      return;
982
983    CAxis* axis = this;
984    std::vector<CAxis*> refAxis;
985    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
986    {
987      refAxis.push_back(axis);
988      axis = axis->getDirectAxisReference();
989    }
990
991    if (axis->hasTransformation())
992      for (size_t i = 0; i < refAxis.size(); ++i)
993        refAxis[i]->setTransformations(axis->getAllTransformations());
994  }
995
996  void CAxis::parse(xml::CXMLNode & node)
997  {
998    SuperClass::parse(node);
999
1000    if (node.goToChildElement())
1001    {
1002      StdString nodeElementName;
1003      do
1004      {
1005        StdString nodeId("");
1006        if (node.getAttributes().end() != node.getAttributes().find("id"))
1007        { nodeId = node.getAttributes()["id"]; }
1008
1009        nodeElementName = node.getElementName();
1010        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
1011        it = transformationMapList_.find(nodeElementName);
1012        if (ite != it)
1013        {
1014          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1015                                                                                                               nodeId,
1016                                                                                                               &node)));
1017        }
1018        else
1019        {
1020          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1021                << "The transformation " << nodeElementName << " has not been supported yet.");
1022        }
1023      } while (node.goToNextElement()) ;
1024      node.goToParentElement();
1025    }
1026  }
1027
1028  DEFINE_REF_FUNC(Axis,axis)
1029
1030   ///---------------------------------------------------------------
1031
1032} // namespace xios
Note: See TracBrowser for help on using the repository browser.