source: XIOS/trunk/src/node/axis.cpp @ 851

Last change on this file since 851 was 836, checked in by mhnguyen, 8 years ago

Exposing transformation to Fortran interface

+) Export zoom and axis transformation to Fortran interface

Test
+) On Curie
+) All work

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 31.0 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
27      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
28      , transformationMap_(), hasValue(false)
29   {
30   }
31
32   CAxis::CAxis(const StdString & id)
33      : CObjectTemplate<CAxis>(id)
34      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
35      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
36      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
37      , transformationMap_(), hasValue(false)
38   {
39   }
40
41   CAxis::~CAxis(void)
42   { /* Ne rien faire de plus */ }
43
44   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
45   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
46   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
47   {
48     m["zoom_axis"] = TRANS_ZOOM_AXIS;
49     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
50     m["inverse_axis"] = TRANS_INVERSE_AXIS;
51   }
52
53   ///---------------------------------------------------------------
54
55   const std::set<StdString> & CAxis::getRelFiles(void) const
56   {
57      return (this->relFiles);
58   }
59
60   bool CAxis::IsWritten(const StdString & filename) const
61   {
62      return (this->relFiles.find(filename) != this->relFiles.end());
63   }
64
65   bool CAxis::isWrittenCompressed(const StdString& filename) const
66   {
67      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
68   }
69
70   bool CAxis::isDistributed(void) const
71   {
72      return isDistributed_;
73   }
74
75   /*!
76    * Test whether the data defined on the axis can be outputted in a compressed way.
77    *
78    * \return true if and only if a mask was defined for this axis
79    */
80   bool CAxis::isCompressible(void) const
81   {
82      return isCompressible_;
83   }
84
85   void CAxis::addRelFile(const StdString & filename)
86   {
87      this->relFiles.insert(filename);
88   }
89
90   void CAxis::addRelFileCompressed(const StdString& filename)
91   {
92      this->relFilesCompressed.insert(filename);
93   }
94
95   //----------------------------------------------------------------
96
97   const std::vector<int>& CAxis::getIndexesToWrite(void) const
98   {
99     return indexesToWrite;
100   }
101
102   /*!
103     Returns the number of indexes written by each server.
104     \return the number of indexes written by each server
105   */
106   int CAxis::getNumberWrittenIndexes() const
107   {
108     return numberWrittenIndexes_;
109   }
110
111   /*!
112     Returns the total number of indexes written by the servers.
113     \return the total number of indexes written by the servers
114   */
115   int CAxis::getTotalNumberWrittenIndexes() const
116   {
117     return totalNumberWrittenIndexes_;
118   }
119
120   /*!
121     Returns the offset of indexes written by each server.
122     \return the offset of indexes written by each server
123   */
124   int CAxis::getOffsetWrittenIndexes() const
125   {
126     return offsetWrittenIndexes_;
127   }
128
129   //----------------------------------------------------------------
130
131   /*!
132    * Compute the minimum buffer size required to send the attributes to the server(s).
133    *
134    * \return A map associating the server rank with its minimum buffer size.
135    */
136   std::map<int, StdSize> CAxis::getAttributesBufferSize()
137   {
138     CContextClient* client = CContext::getCurrent()->client;
139
140     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
141
142     bool isNonDistributed = (n == n_glo);
143
144     if (client->isServerLeader())
145     {
146       // size estimation for sendServerAttribut
147       size_t size = 6 * sizeof(size_t);
148       // size estimation for sendNonDistributedValue
149       if (isNonDistributed)
150         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
151       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
152
153       const std::list<int>& ranks = client->getRanksServerLeader();
154       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
155       {
156         if (size > attributesSizes[*itRank])
157           attributesSizes[*itRank] = size;
158       }
159     }
160
161     if (!isNonDistributed)
162     {
163       // size estimation for sendDistributedValue
164       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
165       for (it = indSrv_.begin(); it != ite; ++it)
166       {
167         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
168         if (isCompressible_)
169           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
170
171         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
172         if (hasBounds_)
173           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
174
175         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
176         if (size > attributesSizes[it->first])
177           attributesSizes[it->first] = size;
178       }
179     }
180
181     return attributesSizes;
182   }
183
184   //----------------------------------------------------------------
185
186   StdString CAxis::GetName(void)   { return (StdString("axis")); }
187   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
188   ENodeType CAxis::GetType(void)   { return (eAxis); }
189
190   //----------------------------------------------------------------
191
192   CAxis* CAxis::createAxis()
193   {
194     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
195     return axis;
196   }
197
198   void CAxis::fillInValues(const CArray<double,1>& values)
199   {
200     this->value = values;
201   }
202
203   void CAxis::checkAttributes(void)
204   {
205      if (this->n_glo.isEmpty())
206        ERROR("CAxis::checkAttributes(void)",
207              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
208              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
209      StdSize size = this->n_glo.getValue();
210
211      isDistributed_ = !this->begin.isEmpty() || !this->n.isEmpty();
212
213      if (!this->begin.isEmpty())
214      {
215        if (begin < 0 || begin > size - 1)
216          ERROR("CAxis::checkAttributes(void)",
217                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
218                << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
219      }
220      else this->begin.setValue(0);
221
222      if (!this->n.isEmpty())
223      {
224        if (n < 0 || n > size)
225          ERROR("CAxis::checkAttributes(void)",
226                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
227                << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
228      }
229      else this->n.setValue(size);
230
231      if (!this->value.isEmpty())
232      {
233        StdSize true_size = value.numElements();
234        if (this->n.getValue() != true_size)
235          ERROR("CAxis::checkAttributes(void)",
236                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
237                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
238        this->hasValue = true;
239      }
240
241      if (this->index.isEmpty())
242      {
243        index.resize(n);
244        for (int i = 0; i < n; ++i) index(i) = i+begin;
245      }
246
247      this->checkData();
248      this->checkZoom();
249      this->checkMask();
250      this->checkBounds();
251   }
252
253   void CAxis::checkData()
254   {
255      if (data_begin.isEmpty()) data_begin.setValue(0);
256
257      if (data_n.isEmpty())
258      {
259        data_n.setValue(n);
260      }
261      else if (data_n.getValue() < 0)
262      {
263        ERROR("CAxis::checkData(void)",
264              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
265              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
266      }
267
268      if (data_index.isEmpty())
269      {
270        data_index.resize(data_n);
271        for (int i = 0; i < data_n; ++i) data_index(i) = i;
272      }
273   }
274
275   void CAxis::checkZoom(void)
276   {
277     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
278     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
279   }
280
281   void CAxis::checkMask()
282   {
283      if (!mask.isEmpty())
284      {
285         if (mask.extent(0) != n)
286           ERROR("CAxis::checkMask(void)",
287                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
288                 << "The mask does not have the same size as the local domain." << std::endl
289                 << "Local size is " << n.getValue() << "." << std::endl
290                 << "Mask size is " << mask.extent(0) << ".");
291      }
292      else // (mask.isEmpty())
293      { // If no mask was defined, we create a default one without any masked point.
294         mask.resize(n);
295         for (int i = 0; i < n; ++i)
296         {
297           mask(i) = true;
298         }
299      }
300   }
301
302  void CAxis::checkBounds()
303  {
304    if (!bounds.isEmpty())
305    {
306      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
307        ERROR("CAxis::checkAttributes(void)",
308              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
309              << "Axis size is " << n.getValue() << "." << std::endl
310              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
311      hasBounds_ = true;
312    }
313    else hasBounds_ = false;
314  }
315
316  void CAxis::checkEligibilityForCompressedOutput()
317  {
318    // We don't check if the mask is valid here, just if a mask has been defined at this point.
319    isCompressible_ = !mask.isEmpty();
320  }
321
322  bool CAxis::dispatchEvent(CEventServer& event)
323   {
324      if (SuperClass::dispatchEvent(event)) return true;
325      else
326      {
327        switch(event.type)
328        {
329           case EVENT_ID_SERVER_ATTRIBUT :
330             recvServerAttribut(event);
331             return true;
332             break;
333           case EVENT_ID_INDEX:
334            recvIndex(event);
335            return true;
336            break;
337          case EVENT_ID_DISTRIBUTED_VALUE:
338            recvDistributedValue(event);
339            return true;
340            break;
341          case EVENT_ID_NON_DISTRIBUTED_VALUE:
342            recvNonDistributedValue(event);
343            return true;
344            break;
345           default :
346             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
347                    << "Unknown Event");
348           return false;
349         }
350      }
351   }
352
353   void CAxis::checkAttributesOnClient()
354   {
355     if (this->areClientAttributesChecked_) return;
356
357     this->checkAttributes();
358
359     this->areClientAttributesChecked_ = true;
360   }
361
362   // Send all checked attributes to server
363   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
364                                     CServerDistributionDescription::ServerDistributionType distType)
365   {
366     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
367     CContext* context = CContext::getCurrent();
368
369     if (this->isChecked) return;
370     if (context->hasClient)
371     {
372       sendServerAttribut(globalDim, orderPositionInGrid, distType);
373       if (hasValue) sendValue(globalDim, orderPositionInGrid, distType);
374     }
375
376     this->isChecked = true;
377   }
378
379  void CAxis::sendValue(const std::vector<int>& globalDim, int orderPositionInGrid,
380                        CServerDistributionDescription::ServerDistributionType distType)
381  {
382     if (n.getValue() == n_glo.getValue())
383     {
384       sendNonDistributedValue();
385     }
386     else
387     {
388       computeConnectedServer(globalDim, orderPositionInGrid, distType);
389       sendDistributedValue();
390     }
391  }
392
393  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
394                                     CServerDistributionDescription::ServerDistributionType distType)
395  {
396    CContext* context = CContext::getCurrent();
397    CContextClient* client = context->client;
398    int nbServer = client->serverSize;
399    int range, clientSize = client->clientSize;
400
401    size_t ni = this->n.getValue();
402    size_t ibegin = this->begin.getValue();
403    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
404    size_t nZoomCount = 0;
405    size_t nbIndex = index.numElements();
406    for (size_t idx = 0; idx < nbIndex; ++idx)
407    {
408      size_t globalIndex = index(idx);
409      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
410    }
411
412    CArray<size_t,1> globalIndexAxis(nbIndex);
413    std::vector<size_t> globalAxisZoom(nZoomCount);
414    nZoomCount = 0;
415    for (size_t idx = 0; idx < nbIndex; ++idx)
416    {
417      size_t globalIndex = index(idx);
418      globalIndexAxis(idx) = globalIndex;
419      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
420      {
421        globalAxisZoom[nZoomCount] = globalIndex;
422        ++nZoomCount;
423      }
424    }
425
426    std::set<int> writtenInd;
427    if (isCompressible_)
428    {
429      for (int idx = 0; idx < data_index.numElements(); ++idx)
430      {
431        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
432
433        if (ind >= 0 && ind < ni && mask(ind))
434        {
435          ind += ibegin;
436          if (ind >= global_zoom_begin && ind <= zoom_end)
437            writtenInd.insert(ind);
438        }
439      }
440    }
441
442    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
443    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
444    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
445    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
446    {
447      std::vector<int> nGlobAxis(1);
448      nGlobAxis[0] = n_glo.getValue();
449
450      size_t globalSizeIndex = 1, indexBegin, indexEnd;
451      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
452      indexBegin = 0;
453      for (int i = 0; i < clientSize; ++i)
454      {
455        range = globalSizeIndex / clientSize;
456        if (i < (globalSizeIndex%clientSize)) ++range;
457        if (i == client->clientRank) break;
458        indexBegin += range;
459      }
460      indexEnd = indexBegin + range - 1;
461
462      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
463      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
464      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
465      clientServerMap->computeServerIndexMapping(globalIndexAxis);
466      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
467      delete clientServerMap;
468    }
469    else
470    {
471      std::vector<size_t> globalIndexServer(n_glo.getValue());
472      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
473      {
474        globalIndexServer[idx] = idx;
475      }
476
477      for (int idx = 0; idx < nbServer; ++idx)
478      {
479        globalIndexAxisOnServer[idx] = globalIndexServer;
480      }
481    }
482
483    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
484                                                         ite = globalIndexAxisOnServer.end();
485    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
486                                        iteVec = (globalAxisZoom).end();
487    indSrv_.clear();
488    indWrittenSrv_.clear();
489    for (; it != ite; ++it)
490    {
491      int rank = it->first;
492      const std::vector<size_t>& globalIndexTmp = it->second;
493      int nb = globalIndexTmp.size();
494
495      for (int i = 0; i < nb; ++i)
496      {
497        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
498        {
499          indSrv_[rank].push_back(globalIndexTmp[i]);
500        }
501
502        if (writtenInd.count(globalIndexTmp[i]))
503        {
504          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
505        }
506      }
507    }
508
509    connectedServerRank_.clear();
510    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
511      connectedServerRank_.push_back(it->first);
512    }
513
514    if (!indSrv_.empty())
515    {
516      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
517                                                     iteIndSrv = indSrv_.end();
518      connectedServerRank_.clear();
519      for (; itIndSrv != iteIndSrv; ++itIndSrv)
520        connectedServerRank_.push_back(itIndSrv->first);
521    }
522    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
523  }
524
525  void CAxis::sendNonDistributedValue()
526  {
527    CContext* context = CContext::getCurrent();
528    CContextClient* client = context->client;
529    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
530
531    int zoom_end = global_zoom_begin + global_zoom_n - 1;
532    int nb = 0;
533    for (size_t idx = 0; idx < n; ++idx)
534    {
535      size_t globalIndex = begin + idx;
536      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
537    }
538
539    int nbWritten = 0;
540    if (isCompressible_)
541    {
542      for (int idx = 0; idx < data_index.numElements(); ++idx)
543      {
544        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
545
546        if (ind >= 0 && ind < n && mask(ind))
547        {
548          ind += begin;
549          if (ind >= global_zoom_begin && ind <= zoom_end)
550            ++nbWritten;
551        }
552      }
553    }
554
555    CArray<double,1> val(nb);
556    nb = 0;
557    for (size_t idx = 0; idx < n; ++idx)
558    {
559      size_t globalIndex = begin + idx;
560      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
561      {
562        val(nb) = value(idx);
563        ++nb;
564      }
565    }
566
567    CArray<int, 1> writtenInd(nbWritten);
568    nbWritten = 0;
569    if (isCompressible_)
570    {
571      for (int idx = 0; idx < data_index.numElements(); ++idx)
572      {
573        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
574
575        if (ind >= 0 && ind < n && mask(ind))
576        {
577          ind += begin;
578          if (ind >= global_zoom_begin && ind <= zoom_end)
579          {
580            writtenInd(nbWritten) = ind;
581            ++nbWritten;
582          }
583        }
584      }
585    }
586
587    if (client->isServerLeader())
588    {
589      std::list<CMessage> msgs;
590
591      const std::list<int>& ranks = client->getRanksServerLeader();
592      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
593      {
594        msgs.push_back(CMessage());
595        CMessage& msg = msgs.back();
596        msg << this->getId();
597        msg << val;
598        if (isCompressible_)
599          msg << writtenInd;
600        event.push(*itRank, 1, msg);
601      }
602      client->sendEvent(event);
603    }
604    else client->sendEvent(event);
605  }
606
607  void CAxis::sendDistributedValue(void)
608  {
609    int ns, n, i, j, ind, nv, idx;
610    CContext* context = CContext::getCurrent();
611    CContextClient* client=context->client;
612
613    // send value for each connected server
614    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
615    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
616
617    list<CMessage> list_msgsIndex, list_msgsVal;
618    list<CArray<int,1> > list_indi;
619    list<CArray<int,1> > list_writtenInd;
620    list<CArray<double,1> > list_val;
621    list<CArray<double,2> > list_bounds;
622
623    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
624    iteMap = indSrv_.end();
625    for (int k = 0; k < connectedServerRank_.size(); ++k)
626    {
627      int nbData = 0;
628      int rank = connectedServerRank_[k];
629      it = indSrv_.find(rank);
630      if (iteMap != it)
631        nbData = it->second.size();
632
633      list_indi.push_back(CArray<int,1>(nbData));
634      list_val.push_back(CArray<double,1>(nbData));
635
636      if (hasBounds_)
637      {
638        list_bounds.push_back(CArray<double,2>(2,nbData));
639      }
640
641      CArray<int,1>& indi = list_indi.back();
642      CArray<double,1>& val = list_val.back();
643
644      for (n = 0; n < nbData; ++n)
645      {
646        idx = static_cast<int>(it->second[n]);
647        ind = idx - begin;
648
649        val(n) = value(ind);
650        indi(n) = idx;
651
652        if (hasBounds_)
653        {
654          CArray<double,2>& boundsVal = list_bounds.back();
655          boundsVal(0, n) = bounds(0,n);
656          boundsVal(1, n) = bounds(1,n);
657        }
658      }
659
660      list_msgsIndex.push_back(CMessage());
661      list_msgsIndex.back() << this->getId() << list_indi.back();
662
663      if (isCompressible_)
664      {
665        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
666        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
667        CArray<int,1>& writtenInd = list_writtenInd.back();
668
669        for (n = 0; n < writtenInd.numElements(); ++n)
670          writtenInd(n) = writtenIndSrc[n];
671
672        list_msgsIndex.back() << writtenInd;
673      }
674
675      list_msgsVal.push_back(CMessage());
676      list_msgsVal.back() << this->getId() << list_val.back();
677
678      if (hasBounds_)
679      {
680        list_msgsVal.back() << list_bounds.back();
681      }
682
683      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
684      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
685    }
686
687    client->sendEvent(eventIndex);
688    client->sendEvent(eventVal);
689  }
690
691  void CAxis::recvIndex(CEventServer& event)
692  {
693    CAxis* axis;
694
695    list<CEventServer::SSubEvent>::iterator it;
696    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
697    {
698      CBufferIn* buffer = it->buffer;
699      string axisId;
700      *buffer >> axisId;
701      axis = get(axisId);
702      axis->recvIndex(it->rank, *buffer);
703    }
704
705    if (axis->isCompressible_)
706    {
707      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
708
709      CContextServer* server = CContext::getCurrent()->server;
710      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
711      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
712      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
713      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
714    }
715  }
716
717  void CAxis::recvIndex(int rank, CBufferIn& buffer)
718  {
719    buffer >> indiSrv_[rank];
720
721    if (isCompressible_)
722    {
723      CArray<int, 1> writtenIndexes;
724      buffer >> writtenIndexes;
725      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
726      for (int i = 0; i < writtenIndexes.numElements(); ++i)
727        indexesToWrite.push_back(writtenIndexes(i));
728    }
729  }
730
731  void CAxis::recvDistributedValue(CEventServer& event)
732  {
733    list<CEventServer::SSubEvent>::iterator it;
734    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
735    {
736      CBufferIn* buffer = it->buffer;
737      string axisId;
738      *buffer >> axisId;
739      get(axisId)->recvDistributedValue(it->rank, *buffer);
740    }
741  }
742
743  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
744  {
745    CArray<int,1> &indi = indiSrv_[rank];
746    CArray<double,1> val;
747    CArray<double,2> boundsVal;
748
749    buffer >> val;
750    if (hasBounds_) buffer >> boundsVal;
751
752    int i, j, ind_srv;
753    for (int ind = 0; ind < indi.numElements(); ++ind)
754    {
755      i = indi(ind);
756      ind_srv = i - zoom_begin_srv;
757      value_srv(ind_srv) = val(ind);
758      if (hasBounds_)
759      {
760        bound_srv(0,ind_srv) = boundsVal(0, ind);
761        bound_srv(1,ind_srv) = boundsVal(1, ind);
762      }
763    }
764  }
765
766   void CAxis::recvNonDistributedValue(CEventServer& event)
767  {
768    CAxis* axis;
769
770    list<CEventServer::SSubEvent>::iterator it;
771    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
772    {
773      CBufferIn* buffer = it->buffer;
774      string axisId;
775      *buffer >> axisId;
776      axis = get(axisId);
777      axis->recvNonDistributedValue(it->rank, *buffer);
778    }
779
780    if (axis->isCompressible_)
781    {
782      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
783
784      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
785      axis->offsetWrittenIndexes_ = 0;
786    }
787  }
788
789  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
790  {
791    CArray<double,1> val;
792    buffer >> val;
793
794    for (int ind = 0; ind < val.numElements(); ++ind)
795    {
796      value_srv(ind) = val(ind);
797      if (hasBounds_)
798      {
799        bound_srv(0,ind) = bounds(0,ind);
800        bound_srv(1,ind) = bounds(1,ind);
801      }
802    }
803
804    if (isCompressible_)
805    {
806      CArray<int, 1> writtenIndexes;
807      buffer >> writtenIndexes;
808      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
809      for (int i = 0; i < writtenIndexes.numElements(); ++i)
810        indexesToWrite.push_back(writtenIndexes(i));
811    }
812  }
813
814  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
815                                 CServerDistributionDescription::ServerDistributionType distType)
816  {
817    CContext* context = CContext::getCurrent();
818    CContextClient* client = context->client;
819    int nbServer = client->serverSize;
820
821    CServerDistributionDescription serverDescription(globalDim, nbServer);
822    serverDescription.computeServerDistribution();
823
824    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
825    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
826
827    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
828    if (client->isServerLeader())
829    {
830      std::list<CMessage> msgs;
831
832      const std::list<int>& ranks = client->getRanksServerLeader();
833      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
834      {
835        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
836        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
837        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
838        const int end   = begin + ni - 1;
839
840        msgs.push_back(CMessage());
841        CMessage& msg = msgs.back();
842        msg << this->getId();
843        msg << ni << begin << end;
844        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
845        msg << isCompressible_;
846
847        event.push(*itRank,1,msg);
848      }
849      client->sendEvent(event);
850    }
851    else client->sendEvent(event);
852  }
853
854  void CAxis::recvServerAttribut(CEventServer& event)
855  {
856    CBufferIn* buffer = event.subEvents.begin()->buffer;
857    string axisId;
858    *buffer >> axisId;
859    get(axisId)->recvServerAttribut(*buffer);
860  }
861
862  void CAxis::recvServerAttribut(CBufferIn& buffer)
863  {
864    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
865
866    buffer >> ni_srv >> begin_srv >> end_srv;
867    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
868    buffer >> isCompressible_;
869    global_zoom_begin = global_zoom_begin_tmp;
870    global_zoom_n  = global_zoom_n_tmp;
871    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
872
873    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
874    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
875    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
876
877    if (zoom_size_srv<=0)
878    {
879      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
880    }
881
882    if (n_glo == n)
883    {
884      zoom_begin_srv = global_zoom_begin;
885      zoom_end_srv   = global_zoom_end; //zoom_end;
886      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
887    }
888    if (hasValue)
889    {
890      value_srv.resize(zoom_size_srv);
891      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
892    }
893  }
894
895  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
896  {
897    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
898    return transformationMap_.back().second;
899  }
900
901  bool CAxis::hasTransformation()
902  {
903    return (!transformationMap_.empty());
904  }
905
906  void CAxis::setTransformations(const TransMapTypes& axisTrans)
907  {
908    transformationMap_ = axisTrans;
909  }
910
911  CAxis::TransMapTypes CAxis::getAllTransformations(void)
912  {
913    return transformationMap_;
914  }
915
916  /*!
917    Check the validity of all transformations applied on axis
918  This functions is called AFTER all inherited attributes are solved
919  */
920  void CAxis::checkTransformations()
921  {
922    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
923                                  ite = transformationMap_.end();
924    for (it = itb; it != ite; ++it)
925    {
926      (it->second)->checkValid(this);
927    }
928  }
929
930  void CAxis::duplicateTransformation(CAxis* src)
931  {
932    if (src->hasTransformation())
933    {
934      this->setTransformations(src->getAllTransformations());
935    }
936  }
937
938  /*!
939   * Go through the hierarchy to find the axis from which the transformations must be inherited
940   */
941  void CAxis::solveInheritanceTransformation()
942  {
943    if (hasTransformation() || !hasDirectAxisReference())
944      return;
945
946    CAxis* axis = this;
947    std::vector<CAxis*> refAxis;
948    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
949    {
950      refAxis.push_back(axis);
951      axis = axis->getDirectAxisReference();
952    }
953
954    if (axis->hasTransformation())
955      for (size_t i = 0; i < refAxis.size(); ++i)
956        refAxis[i]->setTransformations(axis->getAllTransformations());
957  }
958
959  void CAxis::parse(xml::CXMLNode & node)
960  {
961    SuperClass::parse(node);
962
963    if (node.goToChildElement())
964    {
965      StdString nodeElementName;
966      do
967      {
968        StdString nodeId("");
969        if (node.getAttributes().end() != node.getAttributes().find("id"))
970        { nodeId = node.getAttributes()["id"]; }
971
972        nodeElementName = node.getElementName();
973        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
974        it = transformationMapList_.find(nodeElementName);
975        if (ite != it)
976        {
977          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
978                                                                                                               nodeId,
979                                                                                                               &node)));
980        }
981      } while (node.goToNextElement()) ;
982      node.goToParentElement();
983    }
984  }
985
986  DEFINE_REF_FUNC(Axis,axis)
987
988   ///---------------------------------------------------------------
989
990} // namespace xios
Note: See TracBrowser for help on using the repository browser.