source: XIOS/dev/branch_yushan/src/node/axis.cpp @ 1088

Last change on this file since 1088 was 1088, checked in by yushan, 5 years ago

save

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 32.6 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false)
30   {
31   }
32
33   CAxis::CAxis(const StdString & id)
34      : CObjectTemplate<CAxis>(id)
35      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
36      , isClientAfterTransformationChecked(false)
37      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
38      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
39      , transformationMap_(), hasValue(false)
40   {
41   }
42
43   CAxis::~CAxis(void)
44   { /* Ne rien faire de plus */ }
45
46   //std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
47   std::map<StdString, ETranformationType> *CAxis::transformationMapList_ptr = new std::map<StdString, ETranformationType>;
48   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(*CAxis::transformationMapList_ptr);
49   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
50   {
51     m["zoom_axis"] = TRANS_ZOOM_AXIS;
52     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
53     m["inverse_axis"] = TRANS_INVERSE_AXIS;
54     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
55     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
56   }
57
58   ///---------------------------------------------------------------
59
60   const std::set<StdString> & CAxis::getRelFiles(void) const
61   {
62      return (this->relFiles);
63   }
64
65   bool CAxis::IsWritten(const StdString & filename) const
66   {
67      return (this->relFiles.find(filename) != this->relFiles.end());
68   }
69
70   bool CAxis::isWrittenCompressed(const StdString& filename) const
71   {
72      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
73   }
74
75   bool CAxis::isDistributed(void) const
76   {
77      return isDistributed_;
78   }
79
80   /*!
81    * Test whether the data defined on the axis can be outputted in a compressed way.
82    *
83    * \return true if and only if a mask was defined for this axis
84    */
85   bool CAxis::isCompressible(void) const
86   {
87      return isCompressible_;
88   }
89
90   void CAxis::addRelFile(const StdString & filename)
91   {
92      this->relFiles.insert(filename);
93   }
94
95   void CAxis::addRelFileCompressed(const StdString& filename)
96   {
97      this->relFilesCompressed.insert(filename);
98   }
99
100   //----------------------------------------------------------------
101
102   const std::vector<int>& CAxis::getIndexesToWrite(void) const
103   {
104     return indexesToWrite;
105   }
106
107   /*!
108     Returns the number of indexes written by each server.
109     \return the number of indexes written by each server
110   */
111   int CAxis::getNumberWrittenIndexes() const
112   {
113     return numberWrittenIndexes_;
114   }
115
116   /*!
117     Returns the total number of indexes written by the servers.
118     \return the total number of indexes written by the servers
119   */
120   int CAxis::getTotalNumberWrittenIndexes() const
121   {
122     return totalNumberWrittenIndexes_;
123   }
124
125   /*!
126     Returns the offset of indexes written by each server.
127     \return the offset of indexes written by each server
128   */
129   int CAxis::getOffsetWrittenIndexes() const
130   {
131     return offsetWrittenIndexes_;
132   }
133
134   //----------------------------------------------------------------
135
136   /*!
137    * Compute the minimum buffer size required to send the attributes to the server(s).
138    *
139    * \return A map associating the server rank with its minimum buffer size.
140    */
141   std::map<int, StdSize> CAxis::getAttributesBufferSize()
142   {
143     CContextClient* client = CContext::getCurrent()->client;
144
145     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
146
147     bool isNonDistributed = (n == n_glo);
148
149     if (client->isServerLeader())
150     {
151       // size estimation for sendServerAttribut
152       size_t size = 6 * sizeof(size_t);
153       // size estimation for sendNonDistributedValue
154       if (isNonDistributed)
155         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
156       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
157
158       const std::list<int>& ranks = client->getRanksServerLeader();
159       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
160       {
161         if (size > attributesSizes[*itRank])
162           attributesSizes[*itRank] = size;
163       }
164     }
165
166     if (!isNonDistributed)
167     {
168       // size estimation for sendDistributedValue
169       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
170       for (it = indSrv_.begin(); it != ite; ++it)
171       {
172         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
173         if (isCompressible_)
174           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
175
176         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
177         if (hasBounds_)
178           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
179
180         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
181         if (size > attributesSizes[it->first])
182           attributesSizes[it->first] = size;
183       }
184     }
185
186     return attributesSizes;
187   }
188
189   //----------------------------------------------------------------
190
191   StdString CAxis::GetName(void)   { return (StdString("axis")); }
192   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
193   ENodeType CAxis::GetType(void)   { return (eAxis); }
194
195   //----------------------------------------------------------------
196
197   CAxis* CAxis::createAxis()
198   {
199     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
200     return axis;
201   }
202
203   void CAxis::fillInValues(const CArray<double,1>& values)
204   {
205     this->value = values;
206   }
207
208   void CAxis::checkAttributes(void)
209   {
210      if (this->n_glo.isEmpty())
211        ERROR("CAxis::checkAttributes(void)",
212              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
213              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
214      StdSize size = this->n_glo.getValue();
215
216      if (!this->index.isEmpty())
217      {
218        if (n.isEmpty()) n = index.numElements();
219
220        // It's not so correct but if begin is not the first value of index
221        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
222        if (begin.isEmpty()) begin = index(0);         
223      }
224      else 
225      {
226        if (!this->begin.isEmpty())
227        {
228          if (begin < 0 || begin > size - 1)
229            ERROR("CAxis::checkAttributes(void)",
230                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
231                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
232        }
233        else this->begin.setValue(0);
234
235        if (!this->n.isEmpty())
236        {
237          if (n < 0 || n > size)
238            ERROR("CAxis::checkAttributes(void)",
239                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
240                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
241        }
242        else this->n.setValue(size);
243
244        {
245          index.resize(n);
246          for (int i = 0; i < n; ++i) index(i) = i+begin;
247        }
248      }
249
250      if (!this->value.isEmpty())
251      {
252        StdSize true_size = value.numElements();
253        if (this->n.getValue() != true_size)
254          ERROR("CAxis::checkAttributes(void)",
255                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
256                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
257        this->hasValue = true;
258      }
259
260      this->checkData();
261      this->checkZoom();
262      this->checkMask();
263      this->checkBounds();
264
265      isDistributed_ = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
266                       (!this->n.isEmpty() && (this->n != this->n_glo));
267   }
268
269   void CAxis::checkData()
270   {
271      if (data_begin.isEmpty()) data_begin.setValue(0);
272
273      if (data_n.isEmpty())
274      {
275        data_n.setValue(n);
276      }
277      else if (data_n.getValue() < 0)
278      {
279        ERROR("CAxis::checkData(void)",
280              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
281              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
282      }
283
284      if (data_index.isEmpty())
285      {
286        data_index.resize(data_n);
287        for (int i = 0; i < data_n; ++i) data_index(i) = i;
288      }
289   }
290
291   void CAxis::checkZoom(void)
292   {
293     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
294     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
295   }
296
297   void CAxis::checkMask()
298   {
299      if (!mask.isEmpty())
300      {
301         if (mask.extent(0) != n)
302           ERROR("CAxis::checkMask(void)",
303                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
304                 << "The mask does not have the same size as the local domain." << std::endl
305                 << "Local size is " << n.getValue() << "." << std::endl
306                 << "Mask size is " << mask.extent(0) << ".");
307      }
308      else // (mask.isEmpty())
309      { // If no mask was defined, we create a default one without any masked point.
310         mask.resize(n);
311         for (int i = 0; i < n; ++i)
312         {
313           mask(i) = true;
314         }
315      }
316   }
317
318  void CAxis::checkBounds()
319  {
320    if (!bounds.isEmpty())
321    {
322      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
323        ERROR("CAxis::checkAttributes(void)",
324              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
325              << "Axis size is " << n.getValue() << "." << std::endl
326              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
327      hasBounds_ = true;
328    }
329    else hasBounds_ = false;
330  }
331
332  void CAxis::checkEligibilityForCompressedOutput()
333  {
334    // We don't check if the mask is valid here, just if a mask has been defined at this point.
335    isCompressible_ = !mask.isEmpty();
336  }
337
338  bool CAxis::dispatchEvent(CEventServer& event)
339   {
340      if (SuperClass::dispatchEvent(event)) return true;
341      else
342      {
343        switch(event.type)
344        {
345           case EVENT_ID_SERVER_ATTRIBUT :
346             recvServerAttribut(event);
347             return true;
348             break;
349           case EVENT_ID_INDEX:
350            recvIndex(event);
351            return true;
352            break;
353          case EVENT_ID_DISTRIBUTED_VALUE:
354            recvDistributedValue(event);
355            return true;
356            break;
357          case EVENT_ID_NON_DISTRIBUTED_VALUE:
358            recvNonDistributedValue(event);
359            return true;
360            break;
361           default :
362             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
363                    << "Unknown Event");
364           return false;
365         }
366      }
367   }
368
369   void CAxis::checkAttributesOnClient()
370   {
371     if (this->areClientAttributesChecked_) return;
372
373     this->checkAttributes();
374
375     this->areClientAttributesChecked_ = true;
376   }
377
378   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
379                                                          CServerDistributionDescription::ServerDistributionType distType)
380   {
381     CContext* context=CContext::getCurrent() ;
382
383     if (this->isClientAfterTransformationChecked) return;
384     if (context->hasClient)
385     {
386       if (n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
387     }
388
389     this->isClientAfterTransformationChecked = true;
390   }
391
392   // Send all checked attributes to server
393   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
394                                     CServerDistributionDescription::ServerDistributionType distType)
395   {
396     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
397     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
398     CContext* context = CContext::getCurrent();
399
400     if (this->isChecked) return;
401     if (context->hasClient)
402     {
403       sendServerAttribut(globalDim, orderPositionInGrid, distType);
404       if (hasValue) sendValue();
405     }
406
407     this->isChecked = true;
408   }
409
410  void CAxis::sendValue()
411  {
412     if (n.getValue() == n_glo.getValue())
413       sendNonDistributedValue();
414     else
415       sendDistributedValue();
416  }
417
418  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
419                                     CServerDistributionDescription::ServerDistributionType distType)
420  {
421    CContext* context = CContext::getCurrent();
422    CContextClient* client = context->client;
423    int nbServer = client->serverSize;
424    int range, clientSize = client->clientSize;
425    int rank = client->clientRank;
426
427    size_t ni = this->n.getValue();
428    size_t ibegin = this->begin.getValue();
429    size_t zoom_end = global_zoom_begin+global_zoom_n-1;
430    size_t nZoomCount = 0;
431    size_t nbIndex = index.numElements();
432    for (size_t idx = 0; idx < nbIndex; ++idx)
433    {
434      size_t globalIndex = index(idx);
435      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
436    }
437
438    CArray<size_t,1> globalIndexAxis(nbIndex);
439    std::vector<size_t> globalAxisZoom(nZoomCount);
440    nZoomCount = 0;
441    for (size_t idx = 0; idx < nbIndex; ++idx)
442    {
443      size_t globalIndex = index(idx);
444      globalIndexAxis(idx) = globalIndex;
445      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
446      {
447        globalAxisZoom[nZoomCount] = globalIndex;
448        ++nZoomCount;
449      }
450    }
451
452    std::set<int> writtenInd;
453    if (isCompressible_)
454    {
455      for (int idx = 0; idx < data_index.numElements(); ++idx)
456      {
457        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
458
459        if (ind >= 0 && ind < ni && mask(ind))
460        {
461          ind += ibegin;
462          if (ind >= global_zoom_begin && ind <= zoom_end)
463            writtenInd.insert(ind);
464        }
465      }
466    }
467
468    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
469    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
470    CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
471    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
472    {
473      std::vector<int> nGlobAxis(1);
474      nGlobAxis[0] = n_glo.getValue();
475
476      size_t globalSizeIndex = 1, indexBegin, indexEnd;
477      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
478      indexBegin = 0;
479      if (globalSizeIndex <= clientSize)
480      {
481        indexBegin = rank%globalSizeIndex;
482        indexEnd = indexBegin;
483      }
484      else
485      {
486        for (int i = 0; i < clientSize; ++i)
487        {
488          range = globalSizeIndex / clientSize;
489          if (i < (globalSizeIndex%clientSize)) ++range;
490          if (i == client->clientRank) break;
491          indexBegin += range;
492        }
493        indexEnd = indexBegin + range - 1;
494      }
495
496      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
497      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
498      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
499      clientServerMap->computeServerIndexMapping(globalIndexAxis);
500      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
501      delete clientServerMap;
502    }
503    else
504    {
505      std::vector<size_t> globalIndexServer(n_glo.getValue());
506      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
507      {
508        globalIndexServer[idx] = idx;
509      }
510
511      for (int idx = 0; idx < nbServer; ++idx)
512      {
513        globalIndexAxisOnServer[idx] = globalIndexServer;
514      }
515    }
516
517    CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(),
518                                                         ite = globalIndexAxisOnServer.end();
519    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
520                                        iteVec = (globalAxisZoom).end();
521    indSrv_.clear();
522    indWrittenSrv_.clear();
523    for (; it != ite; ++it)
524    {
525      int rank = it->first;
526      const std::vector<size_t>& globalIndexTmp = it->second;
527      int nb = globalIndexTmp.size();
528
529      for (int i = 0; i < nb; ++i)
530      {
531        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
532        {
533          indSrv_[rank].push_back(globalIndexTmp[i]);
534        }
535
536        if (writtenInd.count(globalIndexTmp[i]))
537        {
538          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
539        }
540      }
541    }
542
543    connectedServerRank_.clear();
544    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
545      connectedServerRank_.push_back(it->first);
546    }
547
548    if (!indSrv_.empty())
549    {
550      std::map<int, vector<size_t> >::const_iterator itIndSrv  = indSrv_.begin(),
551                                                     iteIndSrv = indSrv_.end();
552      connectedServerRank_.clear();
553      for (; itIndSrv != iteIndSrv; ++itIndSrv)
554        connectedServerRank_.push_back(itIndSrv->first);
555    }
556    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
557  }
558
559  void CAxis::sendNonDistributedValue()
560  {
561    CContext* context = CContext::getCurrent();
562    CContextClient* client = context->client;
563    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
564
565    int zoom_end = global_zoom_begin + global_zoom_n - 1;
566    int nb = 0;
567    for (size_t idx = 0; idx < n; ++idx)
568    {
569      size_t globalIndex = begin + idx;
570      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
571    }
572
573    int nbWritten = 0;
574    if (isCompressible_)
575    {
576      for (int idx = 0; idx < data_index.numElements(); ++idx)
577      {
578        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
579
580        if (ind >= 0 && ind < n && mask(ind))
581        {
582          ind += begin;
583          if (ind >= global_zoom_begin && ind <= zoom_end)
584            ++nbWritten;
585        }
586      }
587    }
588
589    CArray<double,1> val(nb);
590    nb = 0;
591    for (size_t idx = 0; idx < n; ++idx)
592    {
593      size_t globalIndex = begin + idx;
594      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
595      {
596        val(nb) = value(idx);
597        ++nb;
598      }
599    }
600
601    CArray<int, 1> writtenInd(nbWritten);
602    nbWritten = 0;
603    if (isCompressible_)
604    {
605      for (int idx = 0; idx < data_index.numElements(); ++idx)
606      {
607        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
608
609        if (ind >= 0 && ind < n && mask(ind))
610        {
611          ind += begin;
612          if (ind >= global_zoom_begin && ind <= zoom_end)
613          {
614            writtenInd(nbWritten) = ind;
615            ++nbWritten;
616          }
617        }
618      }
619    }
620
621    if (client->isServerLeader())
622    {
623      std::list<CMessage> msgs;
624
625      const std::list<int>& ranks = client->getRanksServerLeader();
626      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
627      {
628        msgs.push_back(CMessage());
629        CMessage& msg = msgs.back();
630        msg << this->getId();
631        msg << val;
632        if (isCompressible_)
633          msg << writtenInd;
634        event.push(*itRank, 1, msg);
635      }
636      client->sendEvent(event);
637    }
638    else client->sendEvent(event);
639  }
640
641  void CAxis::sendDistributedValue(void)
642  {
643    int ns, n, i, j, ind, nv, idx;
644    CContext* context = CContext::getCurrent();
645    CContextClient* client=context->client;
646
647    // send value for each connected server
648    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
649    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
650
651    list<CMessage> list_msgsIndex, list_msgsVal;
652    list<CArray<int,1> > list_indi;
653    list<CArray<int,1> > list_writtenInd;
654    list<CArray<double,1> > list_val;
655    list<CArray<double,2> > list_bounds;
656
657    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
658    iteMap = indSrv_.end();
659    for (int k = 0; k < connectedServerRank_.size(); ++k)
660    {
661      int nbData = 0;
662      int rank = connectedServerRank_[k];
663      it = indSrv_.find(rank);
664      if (iteMap != it)
665        nbData = it->second.size();
666
667      list_indi.push_back(CArray<int,1>(nbData));
668      list_val.push_back(CArray<double,1>(nbData));
669
670      if (hasBounds_)
671      {
672        list_bounds.push_back(CArray<double,2>(2,nbData));
673      }
674
675      CArray<int,1>& indi = list_indi.back();
676      CArray<double,1>& val = list_val.back();
677
678      for (n = 0; n < nbData; ++n)
679      {
680        idx = static_cast<int>(it->second[n]);
681        ind = idx - begin;
682
683        val(n) = value(ind);
684        indi(n) = idx;
685
686        if (hasBounds_)
687        {
688          CArray<double,2>& boundsVal = list_bounds.back();
689          boundsVal(0, n) = bounds(0,n);
690          boundsVal(1, n) = bounds(1,n);
691        }
692      }
693
694      list_msgsIndex.push_back(CMessage());
695      list_msgsIndex.back() << this->getId() << list_indi.back();
696
697      if (isCompressible_)
698      {
699        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
700        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
701        CArray<int,1>& writtenInd = list_writtenInd.back();
702
703        for (n = 0; n < writtenInd.numElements(); ++n)
704          writtenInd(n) = writtenIndSrc[n];
705
706        list_msgsIndex.back() << writtenInd;
707      }
708
709      list_msgsVal.push_back(CMessage());
710      list_msgsVal.back() << this->getId() << list_val.back();
711
712      if (hasBounds_)
713      {
714        list_msgsVal.back() << list_bounds.back();
715      }
716
717      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
718      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
719    }
720
721    client->sendEvent(eventIndex);
722    client->sendEvent(eventVal);
723  }
724
725  void CAxis::recvIndex(CEventServer& event)
726  {
727    CAxis* axis;
728
729    list<CEventServer::SSubEvent>::iterator it;
730    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
731    {
732      CBufferIn* buffer = it->buffer;
733      string axisId;
734      *buffer >> axisId;
735      axis = get(axisId);
736      axis->recvIndex(it->rank, *buffer);
737    }
738
739    if (axis->isCompressible_)
740    {
741      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
742
743      CContextServer* server = CContext::getCurrent()->server;
744      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
745      ep_lib::MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
746      ep_lib::MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
747      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
748    }
749  }
750
751  void CAxis::recvIndex(int rank, CBufferIn& buffer)
752  {
753    buffer >> indiSrv_[rank];
754
755    if (isCompressible_)
756    {
757      CArray<int, 1> writtenIndexes;
758      buffer >> writtenIndexes;
759      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
760      for (int i = 0; i < writtenIndexes.numElements(); ++i)
761        indexesToWrite.push_back(writtenIndexes(i));
762    }
763  }
764
765  void CAxis::recvDistributedValue(CEventServer& event)
766  {
767    list<CEventServer::SSubEvent>::iterator it;
768    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
769    {
770      CBufferIn* buffer = it->buffer;
771      string axisId;
772      *buffer >> axisId;
773      get(axisId)->recvDistributedValue(it->rank, *buffer);
774    }
775  }
776
777  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
778  {
779    CArray<int,1> &indi = indiSrv_[rank];
780    CArray<double,1> val;
781    CArray<double,2> boundsVal;
782
783    buffer >> val;
784    if (hasBounds_) buffer >> boundsVal;
785
786    int i, j, ind_srv;
787    for (int ind = 0; ind < indi.numElements(); ++ind)
788    {
789      i = indi(ind);
790      ind_srv = i - zoom_begin_srv;
791      value_srv(ind_srv) = val(ind);
792      if (hasBounds_)
793      {
794        bound_srv(0,ind_srv) = boundsVal(0, ind);
795        bound_srv(1,ind_srv) = boundsVal(1, ind);
796      }
797    }
798  }
799
800   void CAxis::recvNonDistributedValue(CEventServer& event)
801  {
802    CAxis* axis;
803
804    list<CEventServer::SSubEvent>::iterator it;
805    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
806    {
807      CBufferIn* buffer = it->buffer;
808      string axisId;
809      *buffer >> axisId;
810      axis = get(axisId);
811      axis->recvNonDistributedValue(it->rank, *buffer);
812    }
813
814    if (axis->isCompressible_)
815    {
816      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
817
818      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
819      axis->offsetWrittenIndexes_ = 0;
820    }
821  }
822
823  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
824  {
825    CArray<double,1> val;
826    buffer >> val;
827
828    for (int ind = 0; ind < val.numElements(); ++ind)
829    {
830      value_srv(ind) = val(ind);
831      if (hasBounds_)
832      {
833        bound_srv(0,ind) = bounds(0,ind);
834        bound_srv(1,ind) = bounds(1,ind);
835      }
836    }
837
838    if (isCompressible_)
839    {
840      CArray<int, 1> writtenIndexes;
841      buffer >> writtenIndexes;
842      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
843      for (int i = 0; i < writtenIndexes.numElements(); ++i)
844        indexesToWrite.push_back(writtenIndexes(i));
845    }
846  }
847
848  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
849                                 CServerDistributionDescription::ServerDistributionType distType)
850  {
851    CContext* context = CContext::getCurrent();
852    CContextClient* client = context->client;
853    int nbServer = client->serverSize;
854
855    CServerDistributionDescription serverDescription(globalDim, nbServer);
856    serverDescription.computeServerDistribution();
857
858    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
859    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
860
861    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
862    if (client->isServerLeader())
863    {
864      std::list<CMessage> msgs;
865
866      const std::list<int>& ranks = client->getRanksServerLeader();
867      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
868      {
869        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
870        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
871        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
872        const int end   = begin + ni - 1;
873
874        msgs.push_back(CMessage());
875        CMessage& msg = msgs.back();
876        msg << this->getId();
877        msg << ni << begin << end;
878        msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
879        msg << isCompressible_;
880
881        event.push(*itRank,1,msg);
882      }
883      client->sendEvent(event);
884    }
885    else client->sendEvent(event);
886  }
887
888  void CAxis::recvServerAttribut(CEventServer& event)
889  {
890    CBufferIn* buffer = event.subEvents.begin()->buffer;
891    string axisId;
892    *buffer >> axisId;
893    get(axisId)->recvServerAttribut(*buffer);
894  }
895
896  void CAxis::recvServerAttribut(CBufferIn& buffer)
897  {
898    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
899
900    buffer >> ni_srv >> begin_srv >> end_srv;
901    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
902    buffer >> isCompressible_;
903    global_zoom_begin = global_zoom_begin_tmp;
904    global_zoom_n  = global_zoom_n_tmp;
905    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
906
907    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
908    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
909    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
910
911    if (zoom_size_srv<=0)
912    {
913      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
914    }
915
916    if (n_glo == n)
917    {
918      zoom_begin_srv = global_zoom_begin;
919      zoom_end_srv   = global_zoom_end; //zoom_end;
920      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
921    }
922    if (hasValue)
923    {
924      value_srv.resize(zoom_size_srv);
925      if (hasBounds_)  bound_srv.resize(2,zoom_size_srv);
926    }
927  }
928
929  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
930  {
931    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
932    return transformationMap_.back().second;
933  }
934
935  bool CAxis::hasTransformation()
936  {
937    return (!transformationMap_.empty());
938  }
939
940  void CAxis::setTransformations(const TransMapTypes& axisTrans)
941  {
942    transformationMap_ = axisTrans;
943  }
944
945  CAxis::TransMapTypes CAxis::getAllTransformations(void)
946  {
947    return transformationMap_;
948  }
949
950  /*!
951    Check the validity of all transformations applied on axis
952  This functions is called AFTER all inherited attributes are solved
953  */
954  void CAxis::checkTransformations()
955  {
956    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
957                                  ite = transformationMap_.end();
958//    for (it = itb; it != ite; ++it)
959//    {
960//      (it->second)->checkValid(this);
961//    }
962  }
963
964  void CAxis::duplicateTransformation(CAxis* src)
965  {
966    if (src->hasTransformation())
967    {
968      this->setTransformations(src->getAllTransformations());
969    }
970  }
971
972  /*!
973   * Go through the hierarchy to find the axis from which the transformations must be inherited
974   */
975  void CAxis::solveInheritanceTransformation()
976  {
977    if (hasTransformation() || !hasDirectAxisReference())
978      return;
979
980    CAxis* axis = this;
981    std::vector<CAxis*> refAxis;
982    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
983    {
984      refAxis.push_back(axis);
985      axis = axis->getDirectAxisReference();
986    }
987
988    if (axis->hasTransformation())
989      for (size_t i = 0; i < refAxis.size(); ++i)
990        refAxis[i]->setTransformations(axis->getAllTransformations());
991  }
992
993  void CAxis::parse(xml::CXMLNode & node)
994  {
995    SuperClass::parse(node);
996
997    if (node.goToChildElement())
998    {
999      StdString nodeElementName;
1000      do
1001      {
1002        StdString nodeId("");
1003        if (node.getAttributes().end() != node.getAttributes().find("id"))
1004        { nodeId = node.getAttributes()["id"]; }
1005
1006        nodeElementName = node.getElementName();
1007        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_ptr->end(), it;
1008        it = transformationMapList_ptr->find(nodeElementName);
1009        if (ite != it)
1010        {
1011          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1012                                                                                                               nodeId,
1013                                                                                                               &node)));
1014        }
1015        else
1016        {
1017          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1018                << "The transformation " << nodeElementName << " has not been supported yet.");
1019        }
1020      } while (node.goToNextElement()) ;
1021      node.goToParentElement();
1022    }
1023  }
1024
1025  DEFINE_REF_FUNC(Axis,axis)
1026
1027   ///---------------------------------------------------------------
1028
1029} // namespace xios
Note: See TracBrowser for help on using the repository browser.