source: XIOS/dev/dev_olga/src/node/axis.cpp @ 1202

Last change on this file since 1202 was 1202, checked in by mhnguyen, 7 years ago

Porting non-continuous axis zoom to dev branch

+) Port axis zoom
+) Resolve some merge conflicts
+) Revert some codes

Test
+) On Curie
+) Ok

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 41.0 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false), hasLabel(false)
30      , computedWrittenIndex_(false)
31   {
32   }
33
34   CAxis::CAxis(const StdString & id)
35      : CObjectTemplate<CAxis>(id)
36      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
37      , isClientAfterTransformationChecked(false)
38      , hasBounds_(false), isCompressible_(false)
39      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
40      , transformationMap_(), hasValue(false), hasLabel(false)
41      , computedWrittenIndex_(false)
42   {
43   }
44
45   CAxis::~CAxis(void)
46   { /* Ne rien faire de plus */ }
47
48   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
49   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
50   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
51   {
52     m["zoom_axis"] = TRANS_ZOOM_AXIS;
53     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
54     m["inverse_axis"] = TRANS_INVERSE_AXIS;
55     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
56     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
57   }
58
59   ///---------------------------------------------------------------
60
61   const std::set<StdString> & CAxis::getRelFiles(void) const
62   {
63      return (this->relFiles);
64   }
65
66   bool CAxis::IsWritten(const StdString & filename) const
67   {
68      return (this->relFiles.find(filename) != this->relFiles.end());
69   }
70
71   bool CAxis::isWrittenCompressed(const StdString& filename) const
72   {
73      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
74   }
75
76   bool CAxis::isDistributed(void) const
77   {
78      bool distributed = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
79             (!this->n.isEmpty() && (this->n != this->n_glo));
80      // A same stupid condition to make sure that if there is only one client, axis
81      // should be considered to be distributed. This should be a temporary solution     
82      distributed |= (1 == CContext::getCurrent()->client->clientSize);
83      return distributed;
84   }
85
86   /*!
87    * Test whether the data defined on the axis can be outputted in a compressed way.
88    *
89    * \return true if and only if a mask was defined for this axis
90    */
91   bool CAxis::isCompressible(void) const
92   {
93      return isCompressible_;
94   }
95
96   void CAxis::addRelFile(const StdString & filename)
97   {
98      this->relFiles.insert(filename);
99   }
100
101   void CAxis::addRelFileCompressed(const StdString& filename)
102   {
103      this->relFilesCompressed.insert(filename);
104   }
105
106   //----------------------------------------------------------------
107
108   /*!
109     Returns the number of indexes written by each server.
110     \return the number of indexes written by each server
111   */
112   int CAxis::getNumberWrittenIndexes() const
113   {
114     return numberWrittenIndexes_;
115   }
116
117   /*!
118     Returns the total number of indexes written by the servers.
119     \return the total number of indexes written by the servers
120   */
121   int CAxis::getTotalNumberWrittenIndexes() const
122   {
123     return totalNumberWrittenIndexes_;
124   }
125
126   /*!
127     Returns the offset of indexes written by each server.
128     \return the offset of indexes written by each server
129   */
130   int CAxis::getOffsetWrittenIndexes() const
131   {
132     return offsetWrittenIndexes_;
133   }
134
135   //----------------------------------------------------------------
136
137   /*!
138    * Compute the minimum buffer size required to send the attributes to the server(s).
139    *
140    * \return A map associating the server rank with its minimum buffer size.
141    */
142   std::map<int, StdSize> CAxis::getAttributesBufferSize()
143   {
144     // For now the assumption is that secondary server pools consist of the same number of procs.
145     // CHANGE the line below if the assumption changes.
146     CContext* context = CContext::getCurrent();
147     CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[0] : context->client;
148
149     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
150
151     bool isNonDistributed = (n == n_glo);
152
153     if (client->isServerLeader())
154     {
155       // size estimation for sendServerAttribut
156       size_t size = 6 * sizeof(size_t);
157       // size estimation for sendNonDistributedValue
158       if (isNonDistributed)
159         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
160       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
161
162       const std::list<int>& ranks = client->getRanksServerLeader();
163       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
164       {
165         if (size > attributesSizes[*itRank])
166           attributesSizes[*itRank] = size;
167       }
168     }
169
170     if (!isNonDistributed)
171     {
172       // size estimation for sendDistributedValue
173       boost::unordered_map<int, vector<size_t> >::const_iterator it, ite = indSrv_.end();
174       for (it = indSrv_.begin(); it != ite; ++it)
175       {
176         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
177         if (isCompressible_)
178           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
179
180         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
181         if (hasBounds_)
182           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
183 
184         if (hasLabel)
185           sizeValEvent += CArray<StdString,1>::size(it->second.size());
186
187         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
188         if (size > attributesSizes[it->first])
189           attributesSizes[it->first] = size;
190       }
191     }
192
193     return attributesSizes;
194   }
195
196   //----------------------------------------------------------------
197
198   StdString CAxis::GetName(void)   { return (StdString("axis")); }
199   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
200   ENodeType CAxis::GetType(void)   { return (eAxis); }
201
202   //----------------------------------------------------------------
203
204   CAxis* CAxis::createAxis()
205   {
206     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
207     return axis;
208   }
209
210   /*!
211     Check common attributes of an axis.
212     This check should be done in the very beginning of work flow
213   */
214   void CAxis::checkAttributes(void)
215   {
216      if (this->n_glo.isEmpty())
217        ERROR("CAxis::checkAttributes(void)",
218              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
219              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
220      StdSize size = this->n_glo.getValue();
221
222      if (!this->index.isEmpty())
223      {
224        if (n.isEmpty()) n = index.numElements();
225
226        // It's not so correct but if begin is not the first value of index
227        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
228        if (begin.isEmpty()) begin = index(0);         
229      }
230      else 
231      {
232        if (!this->begin.isEmpty())
233        {
234          if (begin < 0 || begin > size - 1)
235            ERROR("CAxis::checkAttributes(void)",
236                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
237                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
238        }
239        else this->begin.setValue(0);
240
241        if (!this->n.isEmpty())
242        {
243          if (n < 0 || n > size)
244            ERROR("CAxis::checkAttributes(void)",
245                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
246                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
247        }
248        else this->n.setValue(size);
249
250        {
251          index.resize(n);
252          for (int i = 0; i < n; ++i) index(i) = i+begin;
253        }
254      }
255
256      if (!this->value.isEmpty())
257      {
258        StdSize true_size = value.numElements();
259        if (this->n.getValue() != true_size)
260          ERROR("CAxis::checkAttributes(void)",
261                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
262                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
263        this->hasValue = true;
264      }
265
266      this->checkData();
267      this->checkZoom();
268      this->checkMask();
269      this->checkBounds();
270      this->checkLabel();
271   }
272
273   /*!
274      Check the validity of data and fill in values if any.
275   */
276   void CAxis::checkData()
277   {
278      if (data_begin.isEmpty()) data_begin.setValue(0);
279
280      if (data_n.isEmpty())
281      {
282        data_n.setValue(n);
283      }
284      else if (data_n.getValue() < 0)
285      {
286        ERROR("CAxis::checkData(void)",
287              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
288              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
289      }
290
291      if (data_index.isEmpty())
292      {
293        data_index.resize(data_n);
294        for (int i = 0; i < data_n; ++i) data_index(i) = i;
295      }
296   }
297
298   /*!
299     Check validity of zoom info and fill in values if any.
300   */
301   void CAxis::checkZoom(void)
302   {
303     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
304     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
305     if (zoom_index.isEmpty())
306     {
307       zoom_index.setValue(index.getValue());
308     }
309     if (zoom_n.isEmpty()) zoom_n.setValue(n);
310     if (zoom_begin.isEmpty()) zoom_begin.setValue(begin);
311   }
312
313   /*!
314     Check validity of mask info and fill in values if any.
315   */
316   void CAxis::checkMask()
317   {
318      if (!mask.isEmpty())
319      {
320         if (mask.extent(0) != n)
321           ERROR("CAxis::checkMask(void)",
322                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
323                 << "The mask does not have the same size as the local domain." << std::endl
324                 << "Local size is " << n.getValue() << "." << std::endl
325                 << "Mask size is " << mask.extent(0) << ".");
326      }
327      else // (mask.isEmpty())
328      { // If no mask was defined, we create a default one without any masked point.
329         mask.resize(n);
330         for (int i = 0; i < n; ++i)
331         {
332           mask(i) = true;
333         }
334      }
335   }
336
337   /*!
338     Check validity of bounds info and fill in values if any.
339   */
340   void CAxis::checkBounds()
341   {
342     if (!bounds.isEmpty())
343     {
344       if (bounds.extent(0) != 2 || bounds.extent(1) != n)
345         ERROR("CAxis::checkAttributes(void)",
346               << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
347               << "Axis size is " << n.getValue() << "." << std::endl
348               << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
349       hasBounds_ = true;
350     }
351     else hasBounds_ = false;
352   }
353
354  void CAxis::checkLabel()
355  {
356    if (!label.isEmpty())
357    {
358      if (label.extent(0) != n)
359        ERROR("CAxis::checkLabel(void)",
360              << "The label array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension of axis size." << std::endl
361              << "Axis size is " << n.getValue() << "." << std::endl
362              << "label size is "<< label.extent(0)<<  " .");
363      hasLabel = true;
364    }
365    else hasLabel = false;
366  }
367
368  void CAxis::checkEligibilityForCompressedOutput()
369  {
370    // We don't check if the mask is valid here, just if a mask has been defined at this point.
371    isCompressible_ = !mask.isEmpty();
372  }
373
374  bool CAxis::zoomByIndex()
375  {
376    return (!global_zoom_index.isEmpty() && (0 != global_zoom_index.numElements()));
377  }
378
379   bool CAxis::dispatchEvent(CEventServer& event)
380   {
381      if (SuperClass::dispatchEvent(event)) return true;
382      else
383      {
384        switch(event.type)
385        {
386           case EVENT_ID_DISTRIBUTION_ATTRIBUTE :
387             recvDistributionAttribute(event);
388             return true;
389             break;
390          case EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES:
391            recvNonDistributedAttributes(event);
392            return true;
393            break;
394          case EVENT_ID_DISTRIBUTED_ATTRIBUTES:
395            recvDistributedAttributes(event);
396            return true;
397            break;
398           default :
399             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
400                    << "Unknown Event");
401           return false;
402         }
403      }
404   }
405
406   /*!
407     Check attributes on client side (This name is still adequate???)
408   */
409   void CAxis::checkAttributesOnClient()
410   {
411     if (this->areClientAttributesChecked_) return;
412
413     this->checkAttributes();
414
415     this->areClientAttributesChecked_ = true;
416   }
417
418   /*
419     The (spatial) transformation sometimes can change attributes of an axis. Therefore, we should recheck them.
420   */
421   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
422                                                          CServerDistributionDescription::ServerDistributionType distType)
423   {
424     CContext* context=CContext::getCurrent() ;
425
426     if (this->isClientAfterTransformationChecked) return;
427     if (context->hasClient)
428     {
429       if (index.numElements() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
430     }
431
432     this->isClientAfterTransformationChecked = true;
433   }
434
435   // Send all checked attributes to server
436   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
437                                     CServerDistributionDescription::ServerDistributionType distType)
438   {
439     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
440     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
441     CContext* context = CContext::getCurrent();
442
443     if (this->isChecked) return;
444     if (context->hasClient) sendAttributes(globalDim, orderPositionInGrid, distType);   
445
446     this->isChecked = true;
447   }
448
449  /*!
450    Send attributes from one client to other clients
451    \param[in] globalDim global dimension of grid which contains this axis
452    \param[in] order
453  */
454  void CAxis::sendAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
455                             CServerDistributionDescription::ServerDistributionType distType)
456  {
457     if (index.numElements() == n_glo.getValue())
458       sendNonDistributedAttributes();
459     else
460     {
461       sendDistributedAttributes();       
462     }
463     sendDistributionAttribute(globalDim, orderPositionInGrid, distType);
464  }
465
466  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
467                                     CServerDistributionDescription::ServerDistributionType distType)
468  {
469    CContext* context = CContext::getCurrent();
470
471    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
472    for (int p = 0; p < nbSrvPools; ++p)
473    {
474      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
475      int nbServer = client->serverSize;
476      int range, clientSize = client->clientSize;
477      int rank = client->clientRank;
478
479      size_t ni = this->n.getValue();
480      size_t ibegin = this->begin.getValue();
481      size_t global_zoom_end = global_zoom_begin+global_zoom_n-1;
482      size_t nZoomCount = 0;
483      size_t nbIndex = index.numElements();
484
485      for (size_t idx = 0; idx < nbIndex; ++idx)
486      {
487        globalLocalIndexMap_[index(idx)] = idx;
488      }
489      std::set<int> writtenInd;
490      if (isCompressible_)
491      {
492        for (int idx = 0; idx < data_index.numElements(); ++idx)
493        {
494          int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
495
496          if (ind >= 0 && ind < ni && mask(ind))
497          {
498            ind += ibegin;
499            if (ind >= global_zoom_begin && ind <= global_zoom_end)
500              writtenInd.insert(ind);
501          }
502        }
503      }
504
505      CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer, distType);
506      int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
507      CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
508      if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
509      {
510        std::vector<int> nGlobAxis(1);
511        nGlobAxis[0] = n_glo.getValue();
512
513        size_t globalSizeIndex = 1, indexBegin, indexEnd;
514        for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
515        indexBegin = 0;
516        if (globalSizeIndex <= clientSize)
517        {
518          indexBegin = rank%globalSizeIndex;
519          indexEnd = indexBegin;
520        }
521        else
522        {
523          for (int i = 0; i < clientSize; ++i)
524          {
525            range = globalSizeIndex / clientSize;
526            if (i < (globalSizeIndex%clientSize)) ++range;
527            if (i == client->clientRank) break;
528            indexBegin += range;
529          }
530          indexEnd = indexBegin + range - 1;
531        }
532
533        CArray<size_t,1> globalIndex(index.numElements());
534        for (size_t idx = 0; idx < globalIndex.numElements(); ++idx)
535          globalIndex(idx) = index(idx);
536
537        CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
538        serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
539        CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
540        clientServerMap->computeServerIndexMapping(globalIndex);
541        globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
542        delete clientServerMap;
543      }
544      else
545      {
546        std::vector<size_t> globalIndexServer(n_glo.getValue());
547        for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
548        {
549          globalIndexServer[idx] = idx;
550        }
551
552        for (int idx = 0; idx < nbServer; ++idx)
553        {
554          globalIndexAxisOnServer[idx] = globalIndexServer;
555        }
556      }
557
558      indSrv_.swap(globalIndexAxisOnServer);
559
560      CClientServerMapping::GlobalIndexMap::const_iterator it  = indSrv_.begin(),
561                                                           ite = indSrv_.end();
562
563      connectedServerRank_.clear();
564      for (it = indSrv_.begin(); it != ite; ++it) {
565        connectedServerRank_.push_back(it->first);
566      }
567
568      nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
569    }
570  }
571
572   void CAxis::computeWrittenIndex()
573   { 
574      if (computedWrittenIndex_) return;
575      computedWrittenIndex_ = true;
576
577      CContext* context=CContext::getCurrent();     
578      CContextServer* server = context->server; 
579
580      std::vector<int> nBegin(1), nSize(1), nBeginGlobal(1), nGlob(1);
581      nBegin[0]       = zoom_begin;
582      nSize[0]        = zoom_n;   
583      nBeginGlobal[0] = 0; 
584      nGlob[0]        = n_glo;
585      CDistributionServer srvDist(server->intraCommSize, nBegin, nSize, nBeginGlobal, nGlob); 
586      const CArray<size_t,1>& writtenGlobalIndex  = srvDist.getGlobalIndex();
587
588      size_t nbWritten = 0, indGlo;     
589      boost::unordered_map<size_t,size_t>::const_iterator itb = globalLocalIndexMap_.begin(),
590                                                          ite = globalLocalIndexMap_.end(), it;         
591      CArray<size_t,1>::const_iterator itSrvb = writtenGlobalIndex.begin(),
592                                       itSrve = writtenGlobalIndex.end(), itSrv; 
593      if (!zoomByIndex())
594      {   
595        for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
596        {
597          indGlo = *itSrv;
598          if (ite != globalLocalIndexMap_.find(indGlo))
599          {         
600            ++nbWritten;
601          }                 
602        }
603
604        localIndexToWriteOnServer.resize(nbWritten);
605
606        nbWritten = 0;
607        for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
608        {
609          indGlo = *itSrv;
610          if (ite != globalLocalIndexMap_.find(indGlo))
611          {
612            localIndexToWriteOnServer(nbWritten) = globalLocalIndexMap_[indGlo];
613            ++nbWritten;
614          }                 
615        }
616      }
617      else
618      {
619        nbWritten = 0;
620        boost::unordered_map<size_t,size_t>::const_iterator itb = globalLocalIndexMap_.begin(),
621                                                            ite = globalLocalIndexMap_.end(), it;
622        for (int i = 0; i < zoom_index.numElements(); ++i)
623        {
624           if (ite != globalLocalIndexMap_.find(zoom_index(i)))
625            ++nbWritten;
626        }
627
628        localIndexToWriteOnServer.resize(nbWritten);
629
630        nbWritten = 0;
631        for (int i = 0; i < zoom_index.numElements(); ++i)
632        {
633           if (ite != globalLocalIndexMap_.find(zoom_index(i)))
634           {
635             localIndexToWriteOnServer(nbWritten) = globalLocalIndexMap_[zoom_index(i)];
636             ++nbWritten;
637           }
638        }
639      }
640
641      if (isCompressible())
642      {
643        nbWritten = 0;
644        boost::unordered_map<size_t,size_t> localGlobalIndexMap;
645        for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
646        {
647          indGlo = *itSrv;
648          if (ite != globalLocalIndexMap_.find(indGlo))
649          {
650            localGlobalIndexMap[localIndexToWriteOnServer(nbWritten)] = indGlo;
651            ++nbWritten;
652          }                 
653        }
654
655        nbWritten = 0;
656        for (int idx = 0; idx < data_index.numElements(); ++idx)
657        {
658          if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
659          {
660            ++nbWritten;
661          }
662        }
663
664        compressedIndexToWriteOnServer.resize(nbWritten);
665        nbWritten = 0;
666        for (int idx = 0; idx < data_index.numElements(); ++idx)
667        {
668          if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
669          {
670            compressedIndexToWriteOnServer(nbWritten) = localGlobalIndexMap[data_index(idx)];
671            ++nbWritten;
672          }
673        }
674
675        numberWrittenIndexes_ = nbWritten;
676        if (isDistributed())
677        {
678               
679          MPI_Allreduce(&numberWrittenIndexes_, &totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
680          MPI_Scan(&numberWrittenIndexes_, &offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
681          offsetWrittenIndexes_ -= numberWrittenIndexes_;
682        }
683        else
684          totalNumberWrittenIndexes_ = numberWrittenIndexes_;
685      }
686
687   }
688
689
690
691  void CAxis::sendDistributionAttribute(const std::vector<int>& globalDim, int orderPositionInGrid,
692                                        CServerDistributionDescription::ServerDistributionType distType)
693  {
694    CContext* context = CContext::getCurrent();
695
696    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1;
697    for (int i = 0; i < nbSrvPools; ++i)
698    {
699      CContextClient* contextClientTmp = (context->hasServer) ? context->clientPrimServer[i]
700                                                                         : context->client;
701      int nbServer = contextClientTmp->serverSize;
702
703      CServerDistributionDescription serverDescription(globalDim, nbServer);
704      serverDescription.computeServerDistribution();
705
706      std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
707      std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
708
709      globalDimGrid.resize(globalDim.size());
710      for (int idx = 0; idx < globalDim.size(); ++idx) globalDimGrid(idx) = globalDim[idx];
711
712      CEventClient event(getType(),EVENT_ID_DISTRIBUTION_ATTRIBUTE);
713      if (contextClientTmp->isServerLeader())
714      {
715        std::list<CMessage> msgs;
716
717        const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
718        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
719        {
720          // Use const int to ensure CMessage holds a copy of the value instead of just a reference
721          const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
722          const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
723          const int end   = begin + ni - 1;         
724
725          msgs.push_back(CMessage());
726          CMessage& msg = msgs.back();
727          msg << this->getId();
728          msg << ni << begin << end;
729          // msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
730          msg << isCompressible_;
731          msg << orderPositionInGrid;
732          msg << globalDimGrid;
733
734          event.push(*itRank,1,msg);
735        }
736        contextClientTmp->sendEvent(event);
737      }
738      else contextClientTmp->sendEvent(event);
739    }
740  }
741
742  void CAxis::sendNonDistributedAttributes()
743  {
744    CContext* context = CContext::getCurrent();
745
746    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
747    for (int p = 0; p < nbSrvPools; ++p)
748    {
749      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
750
751      CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES);
752      size_t nbIndex = index.numElements();
753      size_t nbDataIndex = 0;
754
755      for (int idx = 0; idx < data_index.numElements(); ++idx)
756      {
757        int ind = data_index(idx);
758        if (ind >= 0 && ind < nbIndex) ++nbDataIndex;
759      }
760
761      CArray<int,1> dataIndex(nbDataIndex);
762      nbDataIndex = 0;
763      for (int idx = 0; idx < data_index.numElements(); ++idx)
764      {
765        int ind = data_index(idx);
766        if (ind >= 0 && ind < nbIndex)
767        {
768          dataIndex(nbDataIndex) = ind;
769          ++nbDataIndex;
770        }
771      }
772
773      if (client->isServerLeader())
774      {
775        std::list<CMessage> msgs;
776
777        const std::list<int>& ranks = client->getRanksServerLeader();
778        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
779        {
780          msgs.push_back(CMessage());
781          CMessage& msg = msgs.back();
782          msg << this->getId();
783          msg << index.getValue() << dataIndex << mask.getValue();
784          msg << hasValue;
785          if (hasValue) msg << value.getValue();
786          msg << hasBounds_;
787          if (hasBounds_) msg << bounds.getValue();
788
789          event.push(*itRank, 1, msg);
790        }
791        client->sendEvent(event);
792      }
793      else client->sendEvent(event);
794    }
795  }
796
797  void CAxis::recvNonDistributedAttributes(CEventServer& event)
798  {
799    list<CEventServer::SSubEvent>::iterator it;
800    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
801    {
802      CBufferIn* buffer = it->buffer;
803      string axisId;
804      *buffer >> axisId;
805      get(axisId)->recvNonDistributedAttributes(it->rank, *buffer);
806    }
807  }
808
809  void CAxis::recvNonDistributedAttributes(int rank, CBufferIn& buffer)
810  { 
811    CArray<int,1> tmp_index, tmp_data_index, tmp_zoom_index;
812    CArray<bool,1> tmp_mask;
813    CArray<double,1> tmp_val;
814    CArray<double,2> tmp_bnds;
815
816    buffer >> tmp_index;
817    index.reference(tmp_index);
818    buffer >> tmp_data_index;
819    data_index.reference(tmp_data_index);
820    buffer >> tmp_mask;
821    mask.reference(tmp_mask);
822
823    buffer >> hasValue;
824    if (hasValue)
825    {
826      buffer >> tmp_val;
827      value.reference(tmp_val);
828    }
829
830    buffer >> hasBounds_;
831    if (hasBounds_)
832    {
833      buffer >> tmp_bnds;
834      bounds.reference(tmp_bnds);
835    }
836
837    data_begin.setValue(0);
838    globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor()));
839    for (int idx = 0; idx < index.numElements(); ++idx) globalLocalIndexMap_[idx] = index(idx);
840  }
841
842  void CAxis::sendDistributedAttributes(void)
843  {
844    int ns, n, i, j, ind, nv, idx;
845    CContext* context = CContext::getCurrent();
846   
847    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
848    for (int p = 0; p < nbSrvPools; ++p)
849    {
850      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
851
852      CEventClient eventData(getType(), EVENT_ID_DISTRIBUTED_ATTRIBUTES);
853
854      list<CMessage> listData;
855      list<CArray<int,1> > list_indi, list_dataInd, list_zoomInd;
856      list<CArray<bool,1> > list_mask;
857      list<CArray<double,1> > list_val;
858      list<CArray<double,2> > list_bounds;
859
860      int nbIndex = index.numElements();
861      CArray<int,1> dataIndex(nbIndex);
862      dataIndex = -1;
863      for (int inx = 0; inx < data_index.numElements(); ++inx)
864      {
865        if (0 <= data_index(inx) && data_index(inx) < nbIndex)
866          dataIndex(inx) = data_index(inx);
867      }
868
869      boost::unordered_map<int, std::vector<size_t> >::const_iterator it, iteMap;
870      iteMap = indSrv_.end();
871      for (int k = 0; k < connectedServerRank_.size(); ++k)
872      {
873        int nbData = 0;
874        int rank = connectedServerRank_[k];
875        int nbSendingClient = nbConnectedClients_[rank];
876        it = indSrv_.find(rank);
877        if (iteMap != it)
878          nbData = it->second.size();
879
880        list_indi.push_back(CArray<int,1>(nbData));
881        list_dataInd.push_back(CArray<int,1>(nbData));       
882        list_mask.push_back(CArray<bool,1>(nbData));
883
884        if (hasValue)
885          list_val.push_back(CArray<double,1>(nbData));
886
887        if (hasBounds_)
888        {
889          list_bounds.push_back(CArray<double,2>(2,nbData));
890        }
891
892        CArray<int,1>& indi = list_indi.back();
893        CArray<int,1>& dataIndi = list_dataInd.back();       
894        CArray<bool,1>& maskIndi = list_mask.back();
895
896        for (n = 0; n < nbData; ++n)
897        {
898          idx = static_cast<int>(it->second[n]);
899          indi(n) = idx;
900
901          ind = globalLocalIndexMap_[idx];
902          dataIndi(n) = dataIndex(ind);
903          maskIndi(n) = mask(ind);
904
905          if (hasValue)
906          {
907            CArray<double,1>& val = list_val.back();
908            val(n) = value(ind);
909          }
910
911          if (hasBounds_)
912          {
913            CArray<double,2>& boundsVal = list_bounds.back();
914            boundsVal(0, n) = bounds(0,n);
915            boundsVal(1, n) = bounds(1,n);
916          }
917        }
918
919        listData.push_back(CMessage());
920        listData.back() << this->getId()
921                        << list_indi.back() << list_dataInd.back() << list_mask.back();
922
923        listData.back() << hasValue;
924        if (hasValue)
925          listData.back() << list_val.back();
926
927        listData.back() << hasBounds_;
928        if (hasBounds_)
929          listData.back() << list_bounds.back();
930
931        eventData.push(rank, nbConnectedClients_[rank], listData.back());
932      }
933
934      client->sendEvent(eventData);
935    }
936  }
937
938  void CAxis::recvDistributedAttributes(CEventServer& event)
939  {
940    string axisId;
941    vector<int> ranks;
942    vector<CBufferIn*> buffers;
943
944    list<CEventServer::SSubEvent>::iterator it;
945    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
946    {
947      ranks.push_back(it->rank);
948      CBufferIn* buffer = it->buffer;
949      *buffer >> axisId;
950      buffers.push_back(buffer);
951    }
952    get(axisId)->recvDistributedAttributes(ranks, buffers);
953  }
954
955
956  void CAxis::recvDistributedAttributes(vector<int>& ranks, vector<CBufferIn*> buffers)
957  {
958    int nbReceived = ranks.size();
959    vector<CArray<int,1> > vec_indi(nbReceived), vec_dataInd(nbReceived), vec_zoomInd(nbReceived);   
960    vector<CArray<bool,1> > vec_mask(nbReceived);
961    vector<CArray<double,1> > vec_val(nbReceived);
962    vector<CArray<double,2> > vec_bounds(nbReceived);
963   
964    for (int idx = 0; idx < nbReceived; ++idx)
965    {     
966      CBufferIn& buffer = *buffers[idx];
967      buffer >> vec_indi[idx];
968      buffer >> vec_dataInd[idx];     
969      buffer >> vec_mask[idx];
970
971      buffer >> hasValue;
972      if (hasValue)
973        buffer >> vec_val[idx];
974
975      buffer >> hasBounds_;
976      if (hasBounds_)
977        buffer >> vec_bounds[idx];
978    }
979
980    int nbData = 0;
981    for (int idx = 0; idx < nbReceived; ++idx)
982    {
983      nbData += vec_indi[idx].numElements();
984    }
985
986    index.resize(nbData);
987    globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor()));
988    CArray<int,1> nonCompressedData(nbData);   
989    mask.resize(nbData);
990    if (hasValue)
991      value.resize(nbData);
992    if (hasBounds_)
993      bounds.resize(2,nbData);
994
995    nbData = 0;
996    for (int idx = 0; idx < nbReceived; ++idx)
997    {
998      CArray<int,1>& indi = vec_indi[idx];
999      CArray<int,1>& dataIndi = vec_dataInd[idx];
1000      CArray<bool,1>& maskIndi = vec_mask[idx];
1001      int nb = indi.numElements();
1002      for (int n = 0; n < nb; ++n)
1003      {
1004        index(nbData) = indi(n);
1005        globalLocalIndexMap_[indi(n)] = nbData;
1006        nonCompressedData(nbData) = (0 <= dataIndi(n)) ? nbData : -1;
1007        mask(nbData) = maskIndi(n);
1008        if (hasValue)
1009          value(nbData) = vec_val[idx](n);
1010        if (hasBounds_)
1011        {
1012          bounds(0,nbData) = vec_bounds[idx](0,n);
1013          bounds(1,nbData) = vec_bounds[idx](1,n);
1014        }
1015        ++nbData;
1016      }
1017    }
1018
1019    int nbIndex = index.numElements();
1020    int nbCompressedData = 0; 
1021    for (int idx = 0; idx < nonCompressedData.numElements(); ++idx)
1022    {
1023      if (0 <= nonCompressedData(idx) && nonCompressedData(idx) < nbIndex)
1024        ++nbCompressedData;       
1025    }
1026
1027    data_index.resize(nbCompressedData);
1028    nbCompressedData = 0;
1029    for (int idx = 0; idx < nonCompressedData.numElements(); ++idx)
1030    {
1031      if (0 <= nonCompressedData(idx) && nonCompressedData(idx) < nbIndex)
1032      {
1033        data_index(nbCompressedData) = nonCompressedData(idx);
1034        ++nbCompressedData;       
1035      }
1036    }
1037    data_begin.setValue(0);
1038
1039    if (hasLabel)
1040    {
1041      //label_srv(ind_srv) = labelVal( ind);
1042    }
1043  }
1044
1045  void CAxis::recvDistributionAttribute(CEventServer& event)
1046  {
1047    CBufferIn* buffer = event.subEvents.begin()->buffer;
1048    string axisId;
1049    *buffer >> axisId;
1050    get(axisId)->recvDistributionAttribute(*buffer);
1051  }
1052
1053  void CAxis::recvDistributionAttribute(CBufferIn& buffer)
1054  {
1055    int ni_srv, begin_srv, end_srv;
1056    int global_zoom_end, zoom_end;
1057    bool zoomIndex = zoomByIndex();
1058   
1059    std::vector<int> zoom_index_tmp;
1060    std::vector<int>::iterator itZoomBegin, itZoomEnd, itZoom;
1061
1062    buffer >> ni_srv >> begin_srv >> end_srv;
1063    // buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;   
1064    buffer >> isCompressible_;
1065    buffer >> orderPosInGrid;
1066    buffer >> globalDimGrid;   
1067
1068    // Set up new local size of axis on the receiving clients
1069    n.setValue(ni_srv);
1070    begin.setValue(begin_srv);
1071
1072    // If we have zoom by index then process it
1073    if (zoomIndex)
1074    {
1075      zoom_index_tmp.resize(global_zoom_index.numElements());
1076      std::copy(global_zoom_index.begin(), global_zoom_index.end(), zoom_index_tmp.begin());
1077      std::sort(zoom_index_tmp.begin(), zoom_index_tmp.end());
1078      itZoomBegin = std::lower_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), begin_srv);
1079      itZoomEnd   = std::upper_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), end_srv);     
1080      int sz = std::distance(itZoomBegin, itZoomEnd);
1081      zoom_index.resize(sz);
1082      itZoom = itZoomBegin;
1083      for (int i = 0; i < sz; ++i, ++itZoom)
1084      {
1085        zoom_index(i) = *(itZoom);
1086      }
1087    }
1088
1089    global_zoom_begin = zoomIndex ? 0 : global_zoom_begin ;
1090    global_zoom_n     = zoomIndex ? zoom_index_tmp.size() : global_zoom_n;
1091    global_zoom_end   = global_zoom_begin + global_zoom_n - 1;
1092
1093    zoom_begin = zoomIndex ? std::distance(itZoomBegin, zoom_index_tmp.begin())
1094                           : global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
1095    zoom_end   = zoomIndex ? std::distance(zoom_index_tmp.begin(), itZoomEnd) - 1 
1096                           : global_zoom_end < end_srv ? global_zoom_end : end_srv ;
1097    zoom_n     = zoom_end - zoom_begin + 1;
1098
1099    if (zoom_n<=0)
1100    {
1101      zoom_begin = 0; zoom_n = 0;
1102    }
1103
1104    if (n_glo == n)
1105    {
1106      zoom_begin = zoomIndex ? std::distance(itZoomBegin, zoom_index_tmp.begin())
1107                             : global_zoom_begin;     
1108      zoom_n     = zoomIndex ? zoom_index_tmp.size() : global_zoom_n;
1109    }
1110  }
1111
1112  /*!
1113    Compare two axis objects.
1114    They are equal if only if they have identical attributes as well as their values.
1115    Moreover, they must have the same transformations.
1116  \param [in] axis Compared axis
1117  \return result of the comparison
1118  */
1119  bool CAxis::isEqual(CAxis* obj)
1120  {
1121    vector<StdString> excludedAttr;
1122    excludedAttr.push_back("axis_ref");
1123
1124    bool objEqual = SuperClass::isEqual(obj, excludedAttr);   
1125    if (!objEqual) return objEqual;
1126
1127    TransMapTypes thisTrans = this->getAllTransformations();
1128    TransMapTypes objTrans  = obj->getAllTransformations();
1129
1130    TransMapTypes::const_iterator it, itb, ite;
1131    std::vector<ETranformationType> thisTransType, objTransType;
1132    for (it = thisTrans.begin(); it != thisTrans.end(); ++it)
1133      thisTransType.push_back(it->first);
1134    for (it = objTrans.begin(); it != objTrans.end(); ++it)
1135      objTransType.push_back(it->first);
1136
1137    if (thisTransType.size() != objTransType.size()) return false;
1138    for (int idx = 0; idx < thisTransType.size(); ++idx)
1139      objEqual &= (thisTransType[idx] == objTransType[idx]);
1140
1141    return objEqual;
1142  }
1143
1144  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
1145  {
1146    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
1147    return transformationMap_.back().second;
1148  }
1149
1150  bool CAxis::hasTransformation()
1151  {
1152    return (!transformationMap_.empty());
1153  }
1154
1155  void CAxis::setTransformations(const TransMapTypes& axisTrans)
1156  {
1157    transformationMap_ = axisTrans;
1158  }
1159
1160  CAxis::TransMapTypes CAxis::getAllTransformations(void)
1161  {
1162    return transformationMap_;
1163  }
1164
1165  void CAxis::duplicateTransformation(CAxis* src)
1166  {
1167    if (src->hasTransformation())
1168    {
1169      this->setTransformations(src->getAllTransformations());
1170    }
1171  }
1172
1173  /*!
1174   * Go through the hierarchy to find the axis from which the transformations must be inherited
1175   */
1176  void CAxis::solveInheritanceTransformation()
1177  {
1178    if (hasTransformation() || !hasDirectAxisReference())
1179      return;
1180
1181    CAxis* axis = this;
1182    std::vector<CAxis*> refAxis;
1183    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1184    {
1185      refAxis.push_back(axis);
1186      axis = axis->getDirectAxisReference();
1187    }
1188
1189    if (axis->hasTransformation())
1190      for (size_t i = 0; i < refAxis.size(); ++i)
1191        refAxis[i]->setTransformations(axis->getAllTransformations());
1192  }
1193
1194  void CAxis::parse(xml::CXMLNode & node)
1195  {
1196    SuperClass::parse(node);
1197
1198    if (node.goToChildElement())
1199    {
1200      StdString nodeElementName;
1201      do
1202      {
1203        StdString nodeId("");
1204        if (node.getAttributes().end() != node.getAttributes().find("id"))
1205        { nodeId = node.getAttributes()["id"]; }
1206
1207        nodeElementName = node.getElementName();
1208        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
1209        it = transformationMapList_.find(nodeElementName);
1210        if (ite != it)
1211        {
1212          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1213                                                                                                               nodeId,
1214                                                                                                               &node)));
1215        }
1216        else
1217        {
1218          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1219                << "The transformation " << nodeElementName << " has not been supported yet.");
1220        }
1221      } while (node.goToNextElement()) ;
1222      node.goToParentElement();
1223    }
1224  }
1225
1226  DEFINE_REF_FUNC(Axis,axis)
1227
1228   ///---------------------------------------------------------------
1229
1230} // namespace xios
Note: See TracBrowser for help on using the repository browser.