source: XIOS/dev/branch_yushan/src/node/axis.cpp @ 1109

Last change on this file since 1109 was 1109, checked in by yushan, 7 years ago

test_omp OK

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 33.5 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   //std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
47   //bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
48
49   std::map<StdString, ETranformationType> *CAxis::transformationMapList_ptr = 0; //new std::map<StdString, ETranformationType>(); 
50   //bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_ptr);
51
52   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
53   {
54     m["zoom_axis"] = TRANS_ZOOM_AXIS;
55     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
56     m["inverse_axis"] = TRANS_INVERSE_AXIS;
57     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
58     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
59   }
60
61
62   bool CAxis::initializeTransformationMap()
63   {
64     if(CAxis::transformationMapList_ptr == 0) CAxis::transformationMapList_ptr = new std::map<StdString, ETranformationType>();
65     (*CAxis::transformationMapList_ptr)["zoom_axis"]        = TRANS_ZOOM_AXIS;
66     (*CAxis::transformationMapList_ptr)["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
67     (*CAxis::transformationMapList_ptr)["inverse_axis"]     = TRANS_INVERSE_AXIS;
68     (*CAxis::transformationMapList_ptr)["reduce_domain"]    = TRANS_REDUCE_DOMAIN_TO_AXIS;
69     (*CAxis::transformationMapList_ptr)["extract_domain"]   = TRANS_EXTRACT_DOMAIN_TO_AXIS;
70   }
71
72
73   ///---------------------------------------------------------------
74
75   const std::set<StdString> & CAxis::getRelFiles(void) const
76   {
77      return (this->relFiles);
78   }
79
80   bool CAxis::IsWritten(const StdString & filename) const
81   {
82      return (this->relFiles.find(filename) != this->relFiles.end());
83   }
84
85   bool CAxis::isWrittenCompressed(const StdString& filename) const
86   {
87      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
88   }
89
90   bool CAxis::isDistributed(void) const
91   {
92      return isDistributed_;
93   }
94
95   /*!
96    * Test whether the data defined on the axis can be outputted in a compressed way.
97    *
98    * \return true if and only if a mask was defined for this axis
99    */
100   bool CAxis::isCompressible(void) const
101   {
102      return isCompressible_;
103   }
104
105   void CAxis::addRelFile(const StdString & filename)
106   {
107      this->relFiles.insert(filename);
108   }
109
110   void CAxis::addRelFileCompressed(const StdString& filename)
111   {
112      this->relFilesCompressed.insert(filename);
113   }
114
115   //----------------------------------------------------------------
116
117   const std::vector<int>& CAxis::getIndexesToWrite(void) const
118   {
119     return indexesToWrite;
120   }
121
122   /*!
123     Returns the number of indexes written by each server.
124     \return the number of indexes written by each server
125   */
126   int CAxis::getNumberWrittenIndexes() const
127   {
128     return numberWrittenIndexes_;
129   }
130
131   /*!
132     Returns the total number of indexes written by the servers.
133     \return the total number of indexes written by the servers
134   */
135   int CAxis::getTotalNumberWrittenIndexes() const
136   {
137     return totalNumberWrittenIndexes_;
138   }
139
140   /*!
141     Returns the offset of indexes written by each server.
142     \return the offset of indexes written by each server
143   */
144   int CAxis::getOffsetWrittenIndexes() const
145   {
146     return offsetWrittenIndexes_;
147   }
148
149   //----------------------------------------------------------------
150
151   /*!
152    * Compute the minimum buffer size required to send the attributes to the server(s).
153    *
154    * \return A map associating the server rank with its minimum buffer size.
155    */
156   std::map<int, StdSize> CAxis::getAttributesBufferSize()
157   {
158     CContextClient* client = CContext::getCurrent()->client;
159
160     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
161
162     bool isNonDistributed = (n == n_glo);
163
164     if (client->isServerLeader())
165     {
166       // size estimation for sendServerAttribut
167       size_t size = 6 * sizeof(size_t);
168       // size estimation for sendNonDistributedValue
169       if (isNonDistributed)
170         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
171       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
172
173       const std::list<int>& ranks = client->getRanksServerLeader();
174       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
175       {
176         if (size > attributesSizes[*itRank])
177           attributesSizes[*itRank] = size;
178       }
179     }
180
181     if (!isNonDistributed)
182     {
183       // size estimation for sendDistributedValue
184       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
185       for (it = indSrv_.begin(); it != ite; ++it)
186       {
187         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
188         if (isCompressible_)
189           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
190
191         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
192         if (hasBounds_)
193           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
194
195         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
196         if (size > attributesSizes[it->first])
197           attributesSizes[it->first] = size;
198       }
199     }
200
201     return attributesSizes;
202   }
203
204   //----------------------------------------------------------------
205
206   StdString CAxis::GetName(void)   { return (StdString("axis")); }
207   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
208   ENodeType CAxis::GetType(void)   { return (eAxis); }
209
210   //----------------------------------------------------------------
211
212   CAxis* CAxis::createAxis()
213   {
214     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
215     return axis;
216   }
217
218   void CAxis::fillInValues(const CArray<double,1>& values)
219   {
220     this->value = values;
221   }
222
223   void CAxis::checkAttributes(void)
224   {
225      if (this->n_glo.isEmpty())
226        ERROR("CAxis::checkAttributes(void)",
227              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
228              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
229      StdSize size = this->n_glo.getValue();
230
231      if (!this->index.isEmpty())
232      {
233        if (n.isEmpty()) n = index.numElements();
234
235        // It's not so correct but if begin is not the first value of index
236        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
237        if (begin.isEmpty()) begin = index(0);         
238      }
239      else 
240      {
241        if (!this->begin.isEmpty())
242        {
243          if (begin < 0 || begin > size - 1)
244            ERROR("CAxis::checkAttributes(void)",
245                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
246                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
247        }
248        else this->begin.setValue(0);
249
250        if (!this->n.isEmpty())
251        {
252          if (n < 0 || n > size)
253            ERROR("CAxis::checkAttributes(void)",
254                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
255                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
256        }
257        else this->n.setValue(size);
258
259        {
260          index.resize(n);
261          for (int i = 0; i < n; ++i) index(i) = i+begin;
262        }
263      }
264
265      if (!this->value.isEmpty())
266      {
267        StdSize true_size = value.numElements();
268        if (this->n.getValue() != true_size)
269          ERROR("CAxis::checkAttributes(void)",
270                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
271                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
272        this->hasValue = true;
273      }
274
275      this->checkData();
276      this->checkZoom();
277      this->checkMask();
278      this->checkBounds();
279
280      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
281                       (!this->n.isEmpty() && (this->n != this->n_glo));
282   }
283
284   void CAxis::checkData()
285   {
286      if (data_begin.isEmpty()) data_begin.setValue(0);
287
288      if (data_n.isEmpty())
289      {
290        data_n.setValue(n);
291      }
292      else if (data_n.getValue() < 0)
293      {
294        ERROR("CAxis::checkData(void)",
295              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
296              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
297      }
298
299      if (data_index.isEmpty())
300      {
301        data_index.resize(data_n);
302        for (int i = 0; i < data_n; ++i) data_index(i) = i;
303      }
304   }
305
306   void CAxis::checkZoom(void)
307   {
308     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
309     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
310   }
311
312   void CAxis::checkMask()
313   {
314      if (!mask.isEmpty())
315      {
316         if (mask.extent(0) != n)
317           ERROR("CAxis::checkMask(void)",
318                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
319                 << "The mask does not have the same size as the local domain." << std::endl
320                 << "Local size is " << n.getValue() << "." << std::endl
321                 << "Mask size is " << mask.extent(0) << ".");
322      }
323      else // (mask.isEmpty())
324      { // If no mask was defined, we create a default one without any masked point.
325         mask.resize(n);
326         for (int i = 0; i < n; ++i)
327         {
328           mask(i) = true;
329         }
330      }
331   }
332
333  void CAxis::checkBounds()
334  {
335    if (!bounds.isEmpty())
336    {
337      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
338        ERROR("CAxis::checkAttributes(void)",
339              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
340              << "Axis size is " << n.getValue() << "." << std::endl
341              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
342      hasBounds_ = true;
343    }
344    else hasBounds_ = false;
345  }
346
347  void CAxis::checkEligibilityForCompressedOutput()
348  {
349    // We don't check if the mask is valid here, just if a mask has been defined at this point.
350    isCompressible_ = !mask.isEmpty();
351  }
352
353  bool CAxis::dispatchEvent(CEventServer& event)
354   {
355      if (SuperClass::dispatchEvent(event)) return true;
356      else
357      {
358        switch(event.type)
359        {
360           case EVENT_ID_SERVER_ATTRIBUT :
361             recvServerAttribut(event);
362             return true;
363             break;
364           case EVENT_ID_INDEX:
365            recvIndex(event);
366            return true;
367            break;
368          case EVENT_ID_DISTRIBUTED_VALUE:
369            recvDistributedValue(event);
370            return true;
371            break;
372          case EVENT_ID_NON_DISTRIBUTED_VALUE:
373            recvNonDistributedValue(event);
374            return true;
375            break;
376           default :
377             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
378                    << "Unknown Event");
379           return false;
380         }
381      }
382   }
383
384   void CAxis::checkAttributesOnClient()
385   {
386     if (this->areClientAttributesChecked_) return;
387
388     this->checkAttributes();
389
390     this->areClientAttributesChecked_ = true;
391   }
392
393   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
394                                                          CServerDistributionDescription::ServerDistributionType distType)
395   {
396     CContext* context=CContext::getCurrent() ;
397
398     if (this->isClientAfterTransformationChecked) return;
399     if (context->hasClient)
400     {
401       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
402     }
403
404     this->isClientAfterTransformationChecked = true;
405   }
406
407   // Send all checked attributes to server
408   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
409                                     CServerDistributionDescription::ServerDistributionType distType)
410   {
411     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
412     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
413     CContext* context = CContext::getCurrent();
414
415     if (this->isChecked) return;
416     if (context->hasClient)
417     {
418       sendServerAttribut(globalDim, orderPositionInGrid, distType);
419       if (hasValue) sendValue();
420     }
421
422     this->isChecked = true;
423   }
424
425  void CAxis::sendValue()
426  {
427     if (n.getValue() == n_glo.getValue())
428       sendNonDistributedValue();
429     else
430       sendDistributedValue();
431  }
432
433  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
434                                     CServerDistributionDescription::ServerDistributionType distType)
435  {
436    CContext* context = CContext::getCurrent();
437    CContextClient* client = context->client;
438    int nbServer = client->serverSize;
439    int range, clientSize = client->clientSize;
440    int rank = client->clientRank;
441
442    size_t ni = this->n.getValue();
443    size_t ibegin = this->begin.getValue();
444    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
445    size_t nZoomCount = 0;
446    size_t nbIndex = index.numElements();
447    for (size_t idx = 0; idx < nbIndex; ++idx)
448    {
449      size_t globalIndex = index(idx);
450      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
451    }
452
453    CArray<size_t,1> globalIndexAxis(nbIndex);
454    std::vector<size_t> globalAxisZoom(nZoomCount);
455    nZoomCount = 0;
456    for (size_t idx = 0; idx < nbIndex; ++idx)
457    {
458      size_t globalIndex = index(idx);
459      globalIndexAxis(idx) = globalIndex;
460      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
461      {
462        globalAxisZoom[nZoomCount] = globalIndex;
463        ++nZoomCount;
464      }
465    }
466
467    std::set<int> writtenInd;
468    if (isCompressible_)
469    {
470      for (int idx = 0; idx < data_index.numElements(); ++idx)
471      {
472        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
473
474        if (ind >= 0 && ind < ni && mask(ind))
475        {
476          ind += ibegin;
477          if (ind >= global_zoom_begin && ind <= zoom_end)
478            writtenInd.insert(ind);
479        }
480      }
481    }
482
483    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
484    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
485    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
486    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
487    {
488      std::vector<int> nGlobAxis(1);
489      nGlobAxis[0] = n_glo.getValue();
490
491      size_t globalSizeIndex = 1, indexBegin, indexEnd;
492      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
493      indexBegin = 0;
494      if (globalSizeIndex <= clientSize)
495      {
496        indexBegin = rank%globalSizeIndex;
497        indexEnd = indexBegin;
498      }
499      else
500      {
501        for (int i = 0; i < clientSize; ++i)
502        {
503          range = globalSizeIndex / clientSize;
504          if (i < (globalSizeIndex%clientSize)) ++range;
505          if (i == client->clientRank) break;
506          indexBegin += range;
507        }
508        indexEnd = indexBegin + range - 1;
509      }
510
511      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
512      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
513      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
514      clientServerMap->computeServerIndexMapping(globalIndexAxis);
515      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
516      delete clientServerMap;
517    }
518    else
519    {
520      std::vector<size_t> globalIndexServer(n_glo.getValue());
521      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
522      {
523        globalIndexServer[idx] = idx;
524      }
525
526      for (int idx = 0; idx < nbServer; ++idx)
527      {
528        globalIndexAxisOnServer[idx] = globalIndexServer;
529      }
530    }
531
532    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
533                                                         ite = globalIndexAxisOnServer.end();
534    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
535                                        iteVec = (globalAxisZoom).end();
536    indSrv_.clear();
537    indWrittenSrv_.clear();
538    for (; it != ite; ++it)
539    {
540      int rank = it->first;
541      const std::vector<size_t>& globalIndexTmp = it->second;
542      int nb = globalIndexTmp.size();
543
544      for (int i = 0; i < nb; ++i)
545      {
546        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
547        {
548          indSrv_[rank].push_back(globalIndexTmp[i]);
549        }
550
551        if (writtenInd.count(globalIndexTmp[i]))
552        {
553          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
554        }
555      }
556    }
557
558    connectedServerRank_.clear();
559    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
560      connectedServerRank_.push_back(it->first);
561    }
562
563    if (!indSrv_.empty())
564    {
565      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
566                                                     iteIndSrv = indSrv_.end();
567      connectedServerRank_.clear();
568      for (; itIndSrv != iteIndSrv; ++itIndSrv)
569        connectedServerRank_.push_back(itIndSrv->first);
570    }
571    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
572  }
573
574  void CAxis::sendNonDistributedValue()
575  {
576    CContext* context = CContext::getCurrent();
577    CContextClient* client = context->client;
578    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
579
580    int zoom_end = global_zoom_begin + global_zoom_n - 1;
581    int nb = 0;
582    for (size_t idx = 0; idx < n; ++idx)
583    {
584      size_t globalIndex = begin + idx;
585      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
586    }
587
588    int nbWritten = 0;
589    if (isCompressible_)
590    {
591      for (int idx = 0; idx < data_index.numElements(); ++idx)
592      {
593        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
594
595        if (ind >= 0 && ind < n && mask(ind))
596        {
597          ind += begin;
598          if (ind >= global_zoom_begin && ind <= zoom_end)
599            ++nbWritten;
600        }
601      }
602    }
603
604    CArray<double,1> val(nb);
605    nb = 0;
606    for (size_t idx = 0; idx < n; ++idx)
607    {
608      size_t globalIndex = begin + idx;
609      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
610      {
611        val(nb) = value(idx);
612        ++nb;
613      }
614    }
615
616    CArray<int, 1> writtenInd(nbWritten);
617    nbWritten = 0;
618    if (isCompressible_)
619    {
620      for (int idx = 0; idx < data_index.numElements(); ++idx)
621      {
622        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
623
624        if (ind >= 0 && ind < n && mask(ind))
625        {
626          ind += begin;
627          if (ind >= global_zoom_begin && ind <= zoom_end)
628          {
629            writtenInd(nbWritten) = ind;
630            ++nbWritten;
631          }
632        }
633      }
634    }
635
636    if (client->isServerLeader())
637    {
638      std::list<CMessage> msgs;
639
640      const std::list<int>& ranks = client->getRanksServerLeader();
641      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
642      {
643        msgs.push_back(CMessage());
644        CMessage& msg = msgs.back();
645        msg << this->getId();
646        msg << val;
647        if (isCompressible_)
648          msg << writtenInd;
649        event.push(*itRank, 1, msg);
650      }
651      client->sendEvent(event);
652    }
653    else client->sendEvent(event);
654  }
655
656  void CAxis::sendDistributedValue(void)
657  {
658    int ns, n, i, j, ind, nv, idx;
659    CContext* context = CContext::getCurrent();
660    CContextClient* client=context->client;
661
662    // send value for each connected server
663    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
664    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
665
666    list<CMessage> list_msgsIndex, list_msgsVal;
667    list<CArray<int,1> > list_indi;
668    list<CArray<int,1> > list_writtenInd;
669    list<CArray<double,1> > list_val;
670    list<CArray<double,2> > list_bounds;
671
672    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
673    iteMap = indSrv_.end();
674    for (int k = 0; k < connectedServerRank_.size(); ++k)
675    {
676      int nbData = 0;
677      int rank = connectedServerRank_[k];
678      it = indSrv_.find(rank);
679      if (iteMap != it)
680        nbData = it->second.size();
681
682      list_indi.push_back(CArray<int,1>(nbData));
683      list_val.push_back(CArray<double,1>(nbData));
684
685      if (hasBounds_)
686      {
687        list_bounds.push_back(CArray<double,2>(2,nbData));
688      }
689
690      CArray<int,1>& indi = list_indi.back();
691      CArray<double,1>& val = list_val.back();
692
693      for (n = 0; n < nbData; ++n)
694      {
695        idx = static_cast<int>(it->second[n]);
696        ind = idx - begin;
697
698        val(n) = value(ind);
699        indi(n) = idx;
700
701        if (hasBounds_)
702        {
703          CArray<double,2>& boundsVal = list_bounds.back();
704          boundsVal(0, n) = bounds(0,n);
705          boundsVal(1, n) = bounds(1,n);
706        }
707      }
708
709      list_msgsIndex.push_back(CMessage());
710      list_msgsIndex.back() << this->getId() << list_indi.back();
711
712      if (isCompressible_)
713      {
714        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
715        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
716        CArray<int,1>& writtenInd = list_writtenInd.back();
717
718        for (n = 0; n < writtenInd.numElements(); ++n)
719          writtenInd(n) = writtenIndSrc[n];
720
721        list_msgsIndex.back() << writtenInd;
722      }
723
724      list_msgsVal.push_back(CMessage());
725      list_msgsVal.back() << this->getId() << list_val.back();
726
727      if (hasBounds_)
728      {
729        list_msgsVal.back() << list_bounds.back();
730      }
731
732      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
733      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
734    }
735
736    client->sendEvent(eventIndex);
737    client->sendEvent(eventVal);
738  }
739
740  void CAxis::recvIndex(CEventServer& event)
741  {
742    CAxis* axis;
743
744    list<CEventServer::SSubEvent>::iterator it;
745    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
746    {
747      CBufferIn* buffer = it->buffer;
748      string axisId;
749      *buffer >> axisId;
750      axis = get(axisId);
751      axis->recvIndex(it->rank, *buffer);
752    }
753
754    if (axis->isCompressible_)
755    {
756      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
757
758      CContextServer* server = CContext::getCurrent()->server;
759      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
760      ep_lib::MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
761      ep_lib::MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
762      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
763    }
764  }
765
766  void CAxis::recvIndex(int rank, CBufferIn& buffer)
767  {
768    buffer >> indiSrv_[rank];
769
770    if (isCompressible_)
771    {
772      CArray<int, 1> writtenIndexes;
773      buffer >> writtenIndexes;
774      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
775      for (int i = 0; i < writtenIndexes.numElements(); ++i)
776        indexesToWrite.push_back(writtenIndexes(i));
777    }
778  }
779
780  void CAxis::recvDistributedValue(CEventServer& event)
781  {
782    list<CEventServer::SSubEvent>::iterator it;
783    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
784    {
785      CBufferIn* buffer = it->buffer;
786      string axisId;
787      *buffer >> axisId;
788      get(axisId)->recvDistributedValue(it->rank, *buffer);
789    }
790  }
791
792  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
793  {
794    CArray<int,1> &indi = indiSrv_[rank];
795    CArray<double,1> val;
796    CArray<double,2> boundsVal;
797
798    buffer >> val;
799    if (hasBounds_) buffer >> boundsVal;
800
801    int i, j, ind_srv;
802    for (int ind = 0; ind < indi.numElements(); ++ind)
803    {
804      i = indi(ind);
805      ind_srv = i - zoom_begin_srv;
806      value_srv(ind_srv) = val(ind);
807      if (hasBounds_)
808      {
809        bound_srv(0,ind_srv) = boundsVal(0, ind);
810        bound_srv(1,ind_srv) = boundsVal(1, ind);
811      }
812    }
813  }
814
815   void CAxis::recvNonDistributedValue(CEventServer& event)
816  {
817    CAxis* axis;
818
819    list<CEventServer::SSubEvent>::iterator it;
820    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
821    {
822      CBufferIn* buffer = it->buffer;
823      string axisId;
824      *buffer >> axisId;
825      axis = get(axisId);
826      axis->recvNonDistributedValue(it->rank, *buffer);
827    }
828
829    if (axis->isCompressible_)
830    {
831      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
832
833      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
834      axis->offsetWrittenIndexes_ = 0;
835    }
836  }
837
838  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
839  {
840    CArray<double,1> val;
841    buffer >> val;
842
843    for (int ind = 0; ind < val.numElements(); ++ind)
844    {
845      value_srv(ind) = val(ind);
846      if (hasBounds_)
847      {
848        bound_srv(0,ind) = bounds(0,ind);
849        bound_srv(1,ind) = bounds(1,ind);
850      }
851    }
852
853    if (isCompressible_)
854    {
855      CArray<int, 1> writtenIndexes;
856      buffer >> writtenIndexes;
857      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
858      for (int i = 0; i < writtenIndexes.numElements(); ++i)
859        indexesToWrite.push_back(writtenIndexes(i));
860    }
861  }
862
863  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
864                                 CServerDistributionDescription::ServerDistributionType distType)
865  {
866    CContext* context = CContext::getCurrent();
867    CContextClient* client = context->client;
868    int nbServer = client->serverSize;
869
870    CServerDistributionDescription serverDescription(globalDim, nbServer);
871    serverDescription.computeServerDistribution();
872
873    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
874    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
875
876    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
877    if (client->isServerLeader())
878    {
879      std::list<CMessage> msgs;
880
881      const std::list<int>& ranks = client->getRanksServerLeader();
882      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
883      {
884        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
885        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
886        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
887        const int end   = begin + ni - 1;
888
889        msgs.push_back(CMessage());
890        CMessage& msg = msgs.back();
891        msg << this->getId();
892        msg << ni << begin << end;
893        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
894        msg << isCompressible_;
895
896        event.push(*itRank,1,msg);
897      }
898      client->sendEvent(event);
899    }
900    else client->sendEvent(event);
901  }
902
903  void CAxis::recvServerAttribut(CEventServer& event)
904  {
905    CBufferIn* buffer = event.subEvents.begin()->buffer;
906    string axisId;
907    *buffer >> axisId;
908    get(axisId)->recvServerAttribut(*buffer);
909  }
910
911  void CAxis::recvServerAttribut(CBufferIn& buffer)
912  {
913    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
914
915    buffer >> ni_srv >> begin_srv >> end_srv;
916    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
917    buffer >> isCompressible_;
918    global_zoom_begin = global_zoom_begin_tmp;
919    global_zoom_n  = global_zoom_n_tmp;
920    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
921
922    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
923    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
924    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
925
926    if (zoom_size_srv<=0)
927    {
928      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
929    }
930
931    if (n_glo == n)
932    {
933      zoom_begin_srv = global_zoom_begin;
934      zoom_end_srv   = global_zoom_end; //zoom_end;
935      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
936    }
937    if (hasValue)
938    {
939      value_srv.resize(zoom_size_srv);
940      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
941    }
942  }
943
944  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
945  {
946    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
947    return transformationMap_.back().second;
948  }
949
950  bool CAxis::hasTransformation()
951  {
952    return (!transformationMap_.empty());
953  }
954
955  void CAxis::setTransformations(const TransMapTypes& axisTrans)
956  {
957    transformationMap_ = axisTrans;
958  }
959
960  CAxis::TransMapTypes CAxis::getAllTransformations(void)
961  {
962    return transformationMap_;
963  }
964
965  /*!
966    Check the validity of all transformations applied on axis
967  This functions is called AFTER all inherited attributes are solved
968  */
969  void CAxis::checkTransformations()
970  {
971    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
972                                  ite = transformationMap_.end();
973//    for (it = itb; it != ite; ++it)
974//    {
975//      (it->second)->checkValid(this);
976//    }
977  }
978
979  void CAxis::duplicateTransformation(CAxis* src)
980  {
981    if (src->hasTransformation())
982    {
983      this->setTransformations(src->getAllTransformations());
984    }
985  }
986
987  /*!
988   * Go through the hierarchy to find the axis from which the transformations must be inherited
989   */
990  void CAxis::solveInheritanceTransformation()
991  {
992    if (hasTransformation() || !hasDirectAxisReference())
993      return;
994
995    CAxis* axis = this;
996    std::vector<CAxis*> refAxis;
997    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
998    {
999      refAxis.push_back(axis);
1000      axis = axis->getDirectAxisReference();
1001    }
1002
1003    if (axis->hasTransformation())
1004      for (size_t i = 0; i < refAxis.size(); ++i)
1005        refAxis[i]->setTransformations(axis->getAllTransformations());
1006  }
1007
1008  void CAxis::parse(xml::CXMLNode & node)
1009  {
1010    SuperClass::parse(node);
1011
1012    if (node.goToChildElement())
1013    {
1014      StdString nodeElementName;
1015      do
1016      {
1017        StdString nodeId("");
1018        if (node.getAttributes().end() != node.getAttributes().find("id"))
1019        { nodeId = node.getAttributes()["id"]; }
1020
1021        nodeElementName = node.getElementName();
1022
1023        if(transformationMapList_ptr == 0) initializeTransformationMap();
1024        //transformationMapList_ptr = new std::map<StdString, ETranformationType>();
1025
1026        std::map<StdString, ETranformationType>::const_iterator ite = (*CAxis::transformationMapList_ptr).end(), it;
1027        it = (*CAxis::transformationMapList_ptr).find(nodeElementName);
1028        if (ite != it)
1029        {
1030          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1031                                                                                                               nodeId,
1032                                                                                                               &node)));
1033        }
1034        else
1035        {
1036          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1037                << "The transformation " << nodeElementName << " has not been supported yet.");
1038        }
1039      } while (node.goToNextElement()) ;
1040      node.goToParentElement();
1041    }
1042  }
1043
1044  DEFINE_REF_FUNC(Axis,axis)
1045
1046   ///---------------------------------------------------------------
1047
1048} // namespace xios
1049
Note: See TracBrowser for help on using the repository browser.