source: XIOS/dev/XIOS_DEV_CMIP6/src/node/axis.cpp @ 1215

Last change on this file since 1215 was 1215, checked in by ymipsl, 7 years ago

Distribute files on servers 2 -> Ok test_dcmip2

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 41.2 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false), hasLabel(false)
30      , computedWrittenIndex_(false)
31   {
32   }
33
34   CAxis::CAxis(const StdString & id)
35      : CObjectTemplate<CAxis>(id)
36      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
37      , isClientAfterTransformationChecked(false)
38      , hasBounds_(false), isCompressible_(false)
39      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
40      , transformationMap_(), hasValue(false), hasLabel(false)
41      , computedWrittenIndex_(false)
42   {
43   }
44
45   CAxis::~CAxis(void)
46   { /* Ne rien faire de plus */ }
47
48   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
49   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
50   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
51   {
52     m["zoom_axis"] = TRANS_ZOOM_AXIS;
53     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
54     m["inverse_axis"] = TRANS_INVERSE_AXIS;
55     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
56     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
57   }
58
59   ///---------------------------------------------------------------
60
61   const std::set<StdString> & CAxis::getRelFiles(void) const
62   {
63      return (this->relFiles);
64   }
65
66   bool CAxis::IsWritten(const StdString & filename) const
67   {
68      return (this->relFiles.find(filename) != this->relFiles.end());
69   }
70
71   bool CAxis::isWrittenCompressed(const StdString& filename) const
72   {
73      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
74   }
75
76   bool CAxis::isDistributed(void) const
77   {
78      bool distributed = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
79             (!this->n.isEmpty() && (this->n != this->n_glo));
80      // A same stupid condition to make sure that if there is only one client, axis
81      // should be considered to be distributed. This should be a temporary solution     
82      distributed |= (1 == CContext::getCurrent()->client->clientSize);
83      return distributed;
84   }
85
86   /*!
87    * Test whether the data defined on the axis can be outputted in a compressed way.
88    *
89    * \return true if and only if a mask was defined for this axis
90    */
91   bool CAxis::isCompressible(void) const
92   {
93      return isCompressible_;
94   }
95
96   void CAxis::addRelFile(const StdString & filename)
97   {
98      this->relFiles.insert(filename);
99   }
100
101   void CAxis::addRelFileCompressed(const StdString& filename)
102   {
103      this->relFilesCompressed.insert(filename);
104   }
105
106   //----------------------------------------------------------------
107
108   /*!
109     Returns the number of indexes written by each server.
110     \return the number of indexes written by each server
111   */
112   int CAxis::getNumberWrittenIndexes() const
113   {
114     return numberWrittenIndexes_;
115   }
116
117   /*!
118     Returns the total number of indexes written by the servers.
119     \return the total number of indexes written by the servers
120   */
121   int CAxis::getTotalNumberWrittenIndexes() const
122   {
123     return totalNumberWrittenIndexes_;
124   }
125
126   /*!
127     Returns the offset of indexes written by each server.
128     \return the offset of indexes written by each server
129   */
130   int CAxis::getOffsetWrittenIndexes() const
131   {
132     return offsetWrittenIndexes_;
133   }
134
135   //----------------------------------------------------------------
136
137   /*!
138    * Compute the minimum buffer size required to send the attributes to the server(s).
139    *
140    * \return A map associating the server rank with its minimum buffer size.
141    */
142   std::map<int, StdSize> CAxis::getAttributesBufferSize()
143   {
144     // For now the assumption is that secondary server pools consist of the same number of procs.
145     // CHANGE the line below if the assumption changes.
146     CContext* context = CContext::getCurrent();
147     CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[0] : context->client;
148
149     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
150
151     bool isNonDistributed = (n == n_glo);
152
153     if (client->isServerLeader())
154     {
155       // size estimation for sendServerAttribut
156       size_t size = 6 * sizeof(size_t);
157       // size estimation for sendNonDistributedValue
158       if (isNonDistributed)
159         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
160       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
161
162       const std::list<int>& ranks = client->getRanksServerLeader();
163       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
164       {
165         if (size > attributesSizes[*itRank])
166           attributesSizes[*itRank] = size;
167       }
168     }
169
170     if (!isNonDistributed)
171     {
172       // size estimation for sendDistributedValue
173       boost::unordered_map<int, vector<size_t> >::const_iterator it, ite = indSrv_.end();
174       for (it = indSrv_.begin(); it != ite; ++it)
175       {
176         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
177         if (isCompressible_)
178           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
179
180         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
181         if (hasBounds_)
182           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
183 
184         if (hasLabel)
185           sizeValEvent += CArray<StdString,1>::size(it->second.size());
186
187         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
188         if (size > attributesSizes[it->first])
189           attributesSizes[it->first] = size;
190       }
191     }
192
193     return attributesSizes;
194   }
195
196   //----------------------------------------------------------------
197
198   StdString CAxis::GetName(void)   { return (StdString("axis")); }
199   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
200   ENodeType CAxis::GetType(void)   { return (eAxis); }
201
202   //----------------------------------------------------------------
203
204   CAxis* CAxis::createAxis()
205   {
206     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
207     return axis;
208   }
209
210   /*!
211     Check common attributes of an axis.
212     This check should be done in the very beginning of work flow
213   */
214   void CAxis::checkAttributes(void)
215   {
216      if (this->n_glo.isEmpty())
217        ERROR("CAxis::checkAttributes(void)",
218              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
219              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
220      StdSize size = this->n_glo.getValue();
221
222      if (!this->index.isEmpty())
223      {
224        if (n.isEmpty()) n = index.numElements();
225
226        // It's not so correct but if begin is not the first value of index
227        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
228        if (begin.isEmpty()) begin = index(0);         
229      }
230      else 
231      {
232        if (!this->begin.isEmpty())
233        {
234          if (begin < 0 || begin > size - 1)
235            ERROR("CAxis::checkAttributes(void)",
236                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
237                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
238        }
239        else this->begin.setValue(0);
240
241        if (!this->n.isEmpty())
242        {
243          if (n < 0 || n > size)
244            ERROR("CAxis::checkAttributes(void)",
245                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
246                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
247        }
248        else this->n.setValue(size);
249
250        {
251          index.resize(n);
252          for (int i = 0; i < n; ++i) index(i) = i+begin;
253        }
254      }
255
256      if (!this->value.isEmpty())
257      {
258        StdSize true_size = value.numElements();
259        if (this->n.getValue() != true_size)
260          ERROR("CAxis::checkAttributes(void)",
261                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
262                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
263        this->hasValue = true;
264      }
265
266      this->checkData();
267      this->checkZoom();
268      this->checkMask();
269      this->checkBounds();
270      this->checkLabel();
271   }
272
273   /*!
274      Check the validity of data and fill in values if any.
275   */
276   void CAxis::checkData()
277   {
278      if (data_begin.isEmpty()) data_begin.setValue(0);
279
280      if (data_n.isEmpty())
281      {
282        data_n.setValue(n);
283      }
284      else if (data_n.getValue() < 0)
285      {
286        ERROR("CAxis::checkData(void)",
287              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
288              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
289      }
290
291      if (data_index.isEmpty())
292      {
293        data_index.resize(data_n);
294        for (int i = 0; i < data_n; ++i) data_index(i) = i;
295      }
296   }
297
298   /*!
299     Check validity of zoom info and fill in values if any.
300   */
301   void CAxis::checkZoom(void)
302   {
303     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
304     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
305     if (zoom_index.isEmpty())
306     {
307       zoom_index.setValue(index.getValue());
308     }
309     if (zoom_n.isEmpty()) zoom_n.setValue(n);
310     if (zoom_begin.isEmpty()) zoom_begin.setValue(begin);
311   }
312
313    size_t CAxis::getGlobalWrittenSize(void)
314    {
315      if (zoomByIndex()) return  zoom_index.numElements();
316      else return global_zoom_n ;
317    }
318
319   /*!
320     Check validity of mask info and fill in values if any.
321   */
322   void CAxis::checkMask()
323   {
324      if (!mask.isEmpty())
325      {
326         if (mask.extent(0) != n)
327           ERROR("CAxis::checkMask(void)",
328                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
329                 << "The mask does not have the same size as the local domain." << std::endl
330                 << "Local size is " << n.getValue() << "." << std::endl
331                 << "Mask size is " << mask.extent(0) << ".");
332      }
333      else // (mask.isEmpty())
334      { // If no mask was defined, we create a default one without any masked point.
335         mask.resize(n);
336         for (int i = 0; i < n; ++i)
337         {
338           mask(i) = true;
339         }
340      }
341   }
342
343   /*!
344     Check validity of bounds info and fill in values if any.
345   */
346   void CAxis::checkBounds()
347   {
348     if (!bounds.isEmpty())
349     {
350       if (bounds.extent(0) != 2 || bounds.extent(1) != n)
351         ERROR("CAxis::checkAttributes(void)",
352               << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
353               << "Axis size is " << n.getValue() << "." << std::endl
354               << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
355       hasBounds_ = true;
356     }
357     else hasBounds_ = false;
358   }
359
360  void CAxis::checkLabel()
361  {
362    if (!label.isEmpty())
363    {
364      if (label.extent(0) != n)
365        ERROR("CAxis::checkLabel(void)",
366              << "The label array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension of axis size." << std::endl
367              << "Axis size is " << n.getValue() << "." << std::endl
368              << "label size is "<< label.extent(0)<<  " .");
369      hasLabel = true;
370    }
371    else hasLabel = false;
372  }
373
374  void CAxis::checkEligibilityForCompressedOutput()
375  {
376    // We don't check if the mask is valid here, just if a mask has been defined at this point.
377    isCompressible_ = !mask.isEmpty();
378  }
379
380  bool CAxis::zoomByIndex()
381  {
382    return (!global_zoom_index.isEmpty() && (0 != global_zoom_index.numElements()));
383  }
384
385   bool CAxis::dispatchEvent(CEventServer& event)
386   {
387      if (SuperClass::dispatchEvent(event)) return true;
388      else
389      {
390        switch(event.type)
391        {
392           case EVENT_ID_DISTRIBUTION_ATTRIBUTE :
393             recvDistributionAttribute(event);
394             return true;
395             break;
396          case EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES:
397            recvNonDistributedAttributes(event);
398            return true;
399            break;
400          case EVENT_ID_DISTRIBUTED_ATTRIBUTES:
401            recvDistributedAttributes(event);
402            return true;
403            break;
404           default :
405             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
406                    << "Unknown Event");
407           return false;
408         }
409      }
410   }
411
412   /*!
413     Check attributes on client side (This name is still adequate???)
414   */
415   void CAxis::checkAttributesOnClient()
416   {
417     if (this->areClientAttributesChecked_) return;
418
419     this->checkAttributes();
420
421     this->areClientAttributesChecked_ = true;
422   }
423
424   /*
425     The (spatial) transformation sometimes can change attributes of an axis. Therefore, we should recheck them.
426   */
427   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
428                                                          CServerDistributionDescription::ServerDistributionType distType)
429   {
430     CContext* context=CContext::getCurrent() ;
431
432     if (this->isClientAfterTransformationChecked) return;
433     if (context->hasClient)
434     {
435       if (index.numElements() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
436     }
437
438     this->isClientAfterTransformationChecked = true;
439   }
440
441   // Send all checked attributes to server
442   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
443                                     CServerDistributionDescription::ServerDistributionType distType)
444   {
445     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
446     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
447     CContext* context = CContext::getCurrent();
448
449     if (this->isChecked) return;
450     if (context->hasClient) sendAttributes(globalDim, orderPositionInGrid, distType);   
451
452     this->isChecked = true;
453   }
454
455  /*!
456    Send attributes from one client to other clients
457    \param[in] globalDim global dimension of grid which contains this axis
458    \param[in] order
459  */
460  void CAxis::sendAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
461                             CServerDistributionDescription::ServerDistributionType distType)
462  {
463     if (index.numElements() == n_glo.getValue())
464       sendNonDistributedAttributes();
465     else
466     {
467       sendDistributedAttributes();       
468     }
469     sendDistributionAttribute(globalDim, orderPositionInGrid, distType);
470  }
471
472  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
473                                     CServerDistributionDescription::ServerDistributionType distType)
474  {
475    CContext* context = CContext::getCurrent();
476
477    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
478    for (int p = 0; p < nbSrvPools; ++p)
479    {
480      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
481      int nbServer = client->serverSize;
482      int range, clientSize = client->clientSize;
483      int rank = client->clientRank;
484
485      size_t ni = this->n.getValue();
486      size_t ibegin = this->begin.getValue();
487      size_t global_zoom_end = global_zoom_begin+global_zoom_n-1;
488      size_t nZoomCount = 0;
489      size_t nbIndex = index.numElements();
490
491      for (size_t idx = 0; idx < nbIndex; ++idx)
492      {
493        globalLocalIndexMap_[index(idx)] = idx;
494      }
495      std::set<int> writtenInd;
496      if (isCompressible_)
497      {
498        for (int idx = 0; idx < data_index.numElements(); ++idx)
499        {
500          int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
501
502          if (ind >= 0 && ind < ni && mask(ind))
503          {
504            ind += ibegin;
505            if (ind >= global_zoom_begin && ind <= global_zoom_end)
506              writtenInd.insert(ind);
507          }
508        }
509      }
510
511      CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer, distType);
512      int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
513      CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
514      if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
515      {
516        std::vector<int> nGlobAxis(1);
517        nGlobAxis[0] = n_glo.getValue();
518
519        size_t globalSizeIndex = 1, indexBegin, indexEnd;
520        for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
521        indexBegin = 0;
522        if (globalSizeIndex <= clientSize)
523        {
524          indexBegin = rank%globalSizeIndex;
525          indexEnd = indexBegin;
526        }
527        else
528        {
529          for (int i = 0; i < clientSize; ++i)
530          {
531            range = globalSizeIndex / clientSize;
532            if (i < (globalSizeIndex%clientSize)) ++range;
533            if (i == client->clientRank) break;
534            indexBegin += range;
535          }
536          indexEnd = indexBegin + range - 1;
537        }
538
539        CArray<size_t,1> globalIndex(index.numElements());
540        for (size_t idx = 0; idx < globalIndex.numElements(); ++idx)
541          globalIndex(idx) = index(idx);
542
543        CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
544        serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
545        CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
546        clientServerMap->computeServerIndexMapping(globalIndex);
547        globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
548        delete clientServerMap;
549      }
550      else
551      {
552        std::vector<size_t> globalIndexServer(n_glo.getValue());
553        for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
554        {
555          globalIndexServer[idx] = idx;
556        }
557
558        for (int idx = 0; idx < nbServer; ++idx)
559        {
560          globalIndexAxisOnServer[idx] = globalIndexServer;
561        }
562      }
563
564      indSrv_.swap(globalIndexAxisOnServer);
565
566      CClientServerMapping::GlobalIndexMap::const_iterator it  = indSrv_.begin(),
567                                                           ite = indSrv_.end();
568
569      connectedServerRank_.clear();
570      for (it = indSrv_.begin(); it != ite; ++it) {
571        connectedServerRank_.push_back(it->first);
572      }
573
574      nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
575    }
576  }
577
578   void CAxis::computeWrittenIndex()
579   { 
580      if (computedWrittenIndex_) return;
581      computedWrittenIndex_ = true;
582
583      CContext* context=CContext::getCurrent();     
584      CContextServer* server = context->server; 
585
586      std::vector<int> nBegin(1), nSize(1), nBeginGlobal(1), nGlob(1);
587      nBegin[0]       = zoom_begin;
588      nSize[0]        = zoom_n;   
589      nBeginGlobal[0] = 0; 
590      nGlob[0]        = n_glo;
591      CDistributionServer srvDist(server->intraCommSize, nBegin, nSize, nBeginGlobal, nGlob); 
592      const CArray<size_t,1>& writtenGlobalIndex  = srvDist.getGlobalIndex();
593
594      size_t nbWritten = 0, indGlo;     
595      boost::unordered_map<size_t,size_t>::const_iterator itb = globalLocalIndexMap_.begin(),
596                                                          ite = globalLocalIndexMap_.end(), it;         
597      CArray<size_t,1>::const_iterator itSrvb = writtenGlobalIndex.begin(),
598                                       itSrve = writtenGlobalIndex.end(), itSrv; 
599      if (!zoomByIndex())
600      {   
601        for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
602        {
603          indGlo = *itSrv;
604          if (ite != globalLocalIndexMap_.find(indGlo))
605          {         
606            ++nbWritten;
607          }                 
608        }
609
610        localIndexToWriteOnServer.resize(nbWritten);
611
612        nbWritten = 0;
613        for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
614        {
615          indGlo = *itSrv;
616          if (ite != globalLocalIndexMap_.find(indGlo))
617          {
618            localIndexToWriteOnServer(nbWritten) = globalLocalIndexMap_[indGlo];
619            ++nbWritten;
620          }                 
621        }
622      }
623      else
624      {
625        nbWritten = 0;
626        boost::unordered_map<size_t,size_t>::const_iterator itb = globalLocalIndexMap_.begin(),
627                                                            ite = globalLocalIndexMap_.end(), it;
628        for (int i = 0; i < zoom_index.numElements(); ++i)
629        {
630           if (ite != globalLocalIndexMap_.find(zoom_index(i)))
631            ++nbWritten;
632        }
633
634        localIndexToWriteOnServer.resize(nbWritten);
635
636        nbWritten = 0;
637        for (int i = 0; i < zoom_index.numElements(); ++i)
638        {
639           if (ite != globalLocalIndexMap_.find(zoom_index(i)))
640           {
641             localIndexToWriteOnServer(nbWritten) = globalLocalIndexMap_[zoom_index(i)];
642             ++nbWritten;
643           }
644        }
645      }
646
647      if (isCompressible())
648      {
649        nbWritten = 0;
650        boost::unordered_map<size_t,size_t> localGlobalIndexMap;
651        for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
652        {
653          indGlo = *itSrv;
654          if (ite != globalLocalIndexMap_.find(indGlo))
655          {
656            localGlobalIndexMap[localIndexToWriteOnServer(nbWritten)] = indGlo;
657            ++nbWritten;
658          }                 
659        }
660
661        nbWritten = 0;
662        for (int idx = 0; idx < data_index.numElements(); ++idx)
663        {
664          if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
665          {
666            ++nbWritten;
667          }
668        }
669
670        compressedIndexToWriteOnServer.resize(nbWritten);
671        nbWritten = 0;
672        for (int idx = 0; idx < data_index.numElements(); ++idx)
673        {
674          if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
675          {
676            compressedIndexToWriteOnServer(nbWritten) = localGlobalIndexMap[data_index(idx)];
677            ++nbWritten;
678          }
679        }
680
681        numberWrittenIndexes_ = nbWritten;
682        if (isDistributed())
683        {
684               
685          MPI_Allreduce(&numberWrittenIndexes_, &totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
686          MPI_Scan(&numberWrittenIndexes_, &offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
687          offsetWrittenIndexes_ -= numberWrittenIndexes_;
688        }
689        else
690          totalNumberWrittenIndexes_ = numberWrittenIndexes_;
691      }
692
693   }
694
695
696
697  void CAxis::sendDistributionAttribute(const std::vector<int>& globalDim, int orderPositionInGrid,
698                                        CServerDistributionDescription::ServerDistributionType distType)
699  {
700    CContext* context = CContext::getCurrent();
701
702    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1;
703    for (int i = 0; i < nbSrvPools; ++i)
704    {
705      CContextClient* contextClientTmp = (context->hasServer) ? context->clientPrimServer[i]
706                                                                         : context->client;
707      int nbServer = contextClientTmp->serverSize;
708
709      CServerDistributionDescription serverDescription(globalDim, nbServer);
710      serverDescription.computeServerDistribution();
711
712      std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
713      std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
714
715      globalDimGrid.resize(globalDim.size());
716      for (int idx = 0; idx < globalDim.size(); ++idx) globalDimGrid(idx) = globalDim[idx];
717
718      CEventClient event(getType(),EVENT_ID_DISTRIBUTION_ATTRIBUTE);
719      if (contextClientTmp->isServerLeader())
720      {
721        std::list<CMessage> msgs;
722
723        const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
724        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
725        {
726          // Use const int to ensure CMessage holds a copy of the value instead of just a reference
727          const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
728          const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
729          const int end   = begin + ni - 1;         
730
731          msgs.push_back(CMessage());
732          CMessage& msg = msgs.back();
733          msg << this->getId();
734          msg << ni << begin << end;
735          // msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
736          msg << isCompressible_;
737          msg << orderPositionInGrid;
738          msg << globalDimGrid;
739
740          event.push(*itRank,1,msg);
741        }
742        contextClientTmp->sendEvent(event);
743      }
744      else contextClientTmp->sendEvent(event);
745    }
746  }
747
748  void CAxis::sendNonDistributedAttributes()
749  {
750    CContext* context = CContext::getCurrent();
751
752    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
753    for (int p = 0; p < nbSrvPools; ++p)
754    {
755      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
756
757      CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES);
758      size_t nbIndex = index.numElements();
759      size_t nbDataIndex = 0;
760
761      for (int idx = 0; idx < data_index.numElements(); ++idx)
762      {
763        int ind = data_index(idx);
764        if (ind >= 0 && ind < nbIndex) ++nbDataIndex;
765      }
766
767      CArray<int,1> dataIndex(nbDataIndex);
768      nbDataIndex = 0;
769      for (int idx = 0; idx < data_index.numElements(); ++idx)
770      {
771        int ind = data_index(idx);
772        if (ind >= 0 && ind < nbIndex)
773        {
774          dataIndex(nbDataIndex) = ind;
775          ++nbDataIndex;
776        }
777      }
778
779      if (client->isServerLeader())
780      {
781        std::list<CMessage> msgs;
782
783        const std::list<int>& ranks = client->getRanksServerLeader();
784        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
785        {
786          msgs.push_back(CMessage());
787          CMessage& msg = msgs.back();
788          msg << this->getId();
789          msg << index.getValue() << dataIndex << mask.getValue();
790          msg << hasValue;
791          if (hasValue) msg << value.getValue();
792          msg << hasBounds_;
793          if (hasBounds_) msg << bounds.getValue();
794
795          event.push(*itRank, 1, msg);
796        }
797        client->sendEvent(event);
798      }
799      else client->sendEvent(event);
800    }
801  }
802
803  void CAxis::recvNonDistributedAttributes(CEventServer& event)
804  {
805    list<CEventServer::SSubEvent>::iterator it;
806    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
807    {
808      CBufferIn* buffer = it->buffer;
809      string axisId;
810      *buffer >> axisId;
811      get(axisId)->recvNonDistributedAttributes(it->rank, *buffer);
812    }
813  }
814
815  void CAxis::recvNonDistributedAttributes(int rank, CBufferIn& buffer)
816  { 
817    CArray<int,1> tmp_index, tmp_data_index, tmp_zoom_index;
818    CArray<bool,1> tmp_mask;
819    CArray<double,1> tmp_val;
820    CArray<double,2> tmp_bnds;
821
822    buffer >> tmp_index;
823    index.reference(tmp_index);
824    buffer >> tmp_data_index;
825    data_index.reference(tmp_data_index);
826    buffer >> tmp_mask;
827    mask.reference(tmp_mask);
828
829    buffer >> hasValue;
830    if (hasValue)
831    {
832      buffer >> tmp_val;
833      value.reference(tmp_val);
834    }
835
836    buffer >> hasBounds_;
837    if (hasBounds_)
838    {
839      buffer >> tmp_bnds;
840      bounds.reference(tmp_bnds);
841    }
842
843    data_begin.setValue(0);
844    globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor()));
845    for (int idx = 0; idx < index.numElements(); ++idx) globalLocalIndexMap_[idx] = index(idx);
846  }
847
848  void CAxis::sendDistributedAttributes(void)
849  {
850    int ns, n, i, j, ind, nv, idx;
851    CContext* context = CContext::getCurrent();
852   
853    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
854    for (int p = 0; p < nbSrvPools; ++p)
855    {
856      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
857
858      CEventClient eventData(getType(), EVENT_ID_DISTRIBUTED_ATTRIBUTES);
859
860      list<CMessage> listData;
861      list<CArray<int,1> > list_indi, list_dataInd, list_zoomInd;
862      list<CArray<bool,1> > list_mask;
863      list<CArray<double,1> > list_val;
864      list<CArray<double,2> > list_bounds;
865
866      int nbIndex = index.numElements();
867      CArray<int,1> dataIndex(nbIndex);
868      dataIndex = -1;
869      for (int inx = 0; inx < data_index.numElements(); ++inx)
870      {
871        if (0 <= data_index(inx) && data_index(inx) < nbIndex)
872          dataIndex(inx) = data_index(inx);
873      }
874
875      boost::unordered_map<int, std::vector<size_t> >::const_iterator it, iteMap;
876      iteMap = indSrv_.end();
877      for (int k = 0; k < connectedServerRank_.size(); ++k)
878      {
879        int nbData = 0;
880        int rank = connectedServerRank_[k];
881        int nbSendingClient = nbConnectedClients_[rank];
882        it = indSrv_.find(rank);
883        if (iteMap != it)
884          nbData = it->second.size();
885
886        list_indi.push_back(CArray<int,1>(nbData));
887        list_dataInd.push_back(CArray<int,1>(nbData));       
888        list_mask.push_back(CArray<bool,1>(nbData));
889
890        if (hasValue)
891          list_val.push_back(CArray<double,1>(nbData));
892
893        if (hasBounds_)
894        {
895          list_bounds.push_back(CArray<double,2>(2,nbData));
896        }
897
898        CArray<int,1>& indi = list_indi.back();
899        CArray<int,1>& dataIndi = list_dataInd.back();       
900        CArray<bool,1>& maskIndi = list_mask.back();
901
902        for (n = 0; n < nbData; ++n)
903        {
904          idx = static_cast<int>(it->second[n]);
905          indi(n) = idx;
906
907          ind = globalLocalIndexMap_[idx];
908          dataIndi(n) = dataIndex(ind);
909          maskIndi(n) = mask(ind);
910
911          if (hasValue)
912          {
913            CArray<double,1>& val = list_val.back();
914            val(n) = value(ind);
915          }
916
917          if (hasBounds_)
918          {
919            CArray<double,2>& boundsVal = list_bounds.back();
920            boundsVal(0, n) = bounds(0,n);
921            boundsVal(1, n) = bounds(1,n);
922          }
923        }
924
925        listData.push_back(CMessage());
926        listData.back() << this->getId()
927                        << list_indi.back() << list_dataInd.back() << list_mask.back();
928
929        listData.back() << hasValue;
930        if (hasValue)
931          listData.back() << list_val.back();
932
933        listData.back() << hasBounds_;
934        if (hasBounds_)
935          listData.back() << list_bounds.back();
936
937        eventData.push(rank, nbConnectedClients_[rank], listData.back());
938      }
939
940      client->sendEvent(eventData);
941    }
942  }
943
944  void CAxis::recvDistributedAttributes(CEventServer& event)
945  {
946    string axisId;
947    vector<int> ranks;
948    vector<CBufferIn*> buffers;
949
950    list<CEventServer::SSubEvent>::iterator it;
951    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
952    {
953      ranks.push_back(it->rank);
954      CBufferIn* buffer = it->buffer;
955      *buffer >> axisId;
956      buffers.push_back(buffer);
957    }
958    get(axisId)->recvDistributedAttributes(ranks, buffers);
959  }
960
961
962  void CAxis::recvDistributedAttributes(vector<int>& ranks, vector<CBufferIn*> buffers)
963  {
964    int nbReceived = ranks.size();
965    vector<CArray<int,1> > vec_indi(nbReceived), vec_dataInd(nbReceived), vec_zoomInd(nbReceived);   
966    vector<CArray<bool,1> > vec_mask(nbReceived);
967    vector<CArray<double,1> > vec_val(nbReceived);
968    vector<CArray<double,2> > vec_bounds(nbReceived);
969   
970    for (int idx = 0; idx < nbReceived; ++idx)
971    {     
972      CBufferIn& buffer = *buffers[idx];
973      buffer >> vec_indi[idx];
974      buffer >> vec_dataInd[idx];     
975      buffer >> vec_mask[idx];
976
977      buffer >> hasValue;
978      if (hasValue)
979        buffer >> vec_val[idx];
980
981      buffer >> hasBounds_;
982      if (hasBounds_)
983        buffer >> vec_bounds[idx];
984    }
985
986    int nbData = 0;
987    for (int idx = 0; idx < nbReceived; ++idx)
988    {
989      nbData += vec_indi[idx].numElements();
990    }
991
992    index.resize(nbData);
993    globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor()));
994    CArray<int,1> nonCompressedData(nbData);   
995    mask.resize(nbData);
996    if (hasValue)
997      value.resize(nbData);
998    if (hasBounds_)
999      bounds.resize(2,nbData);
1000
1001    nbData = 0;
1002    for (int idx = 0; idx < nbReceived; ++idx)
1003    {
1004      CArray<int,1>& indi = vec_indi[idx];
1005      CArray<int,1>& dataIndi = vec_dataInd[idx];
1006      CArray<bool,1>& maskIndi = vec_mask[idx];
1007      int nb = indi.numElements();
1008      for (int n = 0; n < nb; ++n)
1009      {
1010        index(nbData) = indi(n);
1011        globalLocalIndexMap_[indi(n)] = nbData;
1012        nonCompressedData(nbData) = (0 <= dataIndi(n)) ? nbData : -1;
1013        mask(nbData) = maskIndi(n);
1014        if (hasValue)
1015          value(nbData) = vec_val[idx](n);
1016        if (hasBounds_)
1017        {
1018          bounds(0,nbData) = vec_bounds[idx](0,n);
1019          bounds(1,nbData) = vec_bounds[idx](1,n);
1020        }
1021        ++nbData;
1022      }
1023    }
1024
1025    int nbIndex = index.numElements();
1026    int nbCompressedData = 0; 
1027    for (int idx = 0; idx < nonCompressedData.numElements(); ++idx)
1028    {
1029      if (0 <= nonCompressedData(idx) && nonCompressedData(idx) < nbIndex)
1030        ++nbCompressedData;       
1031    }
1032
1033    data_index.resize(nbCompressedData);
1034    nbCompressedData = 0;
1035    for (int idx = 0; idx < nonCompressedData.numElements(); ++idx)
1036    {
1037      if (0 <= nonCompressedData(idx) && nonCompressedData(idx) < nbIndex)
1038      {
1039        data_index(nbCompressedData) = nonCompressedData(idx);
1040        ++nbCompressedData;       
1041      }
1042    }
1043    data_begin.setValue(0);
1044
1045    if (hasLabel)
1046    {
1047      //label_srv(ind_srv) = labelVal( ind);
1048    }
1049  }
1050
1051  void CAxis::recvDistributionAttribute(CEventServer& event)
1052  {
1053    CBufferIn* buffer = event.subEvents.begin()->buffer;
1054    string axisId;
1055    *buffer >> axisId;
1056    get(axisId)->recvDistributionAttribute(*buffer);
1057  }
1058
1059  void CAxis::recvDistributionAttribute(CBufferIn& buffer)
1060  {
1061    int ni_srv, begin_srv, end_srv;
1062    int global_zoom_end, zoom_end;
1063    bool zoomIndex = zoomByIndex();
1064   
1065    std::vector<int> zoom_index_tmp;
1066    std::vector<int>::iterator itZoomBegin, itZoomEnd, itZoom;
1067
1068    buffer >> ni_srv >> begin_srv >> end_srv;
1069    // buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;   
1070    buffer >> isCompressible_;
1071    buffer >> orderPosInGrid;
1072    buffer >> globalDimGrid;   
1073
1074    // Set up new local size of axis on the receiving clients
1075    n.setValue(ni_srv);
1076    begin.setValue(begin_srv);
1077
1078    // If we have zoom by index then process it
1079    if (zoomIndex)
1080    {
1081      zoom_index_tmp.resize(global_zoom_index.numElements());
1082      std::copy(global_zoom_index.begin(), global_zoom_index.end(), zoom_index_tmp.begin());
1083      std::sort(zoom_index_tmp.begin(), zoom_index_tmp.end());
1084      itZoomBegin = std::lower_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), begin_srv);
1085      itZoomEnd   = std::upper_bound(zoom_index_tmp.begin(), zoom_index_tmp.end(), end_srv);     
1086      int sz = std::distance(itZoomBegin, itZoomEnd);
1087      zoom_index.resize(sz);
1088      itZoom = itZoomBegin;
1089      for (int i = 0; i < sz; ++i, ++itZoom)
1090      {
1091        zoom_index(i) = *(itZoom);
1092      }
1093    }
1094
1095    global_zoom_begin = zoomIndex ? 0 : global_zoom_begin ;
1096    global_zoom_n     = zoomIndex ? zoom_index_tmp.size() : global_zoom_n;
1097    global_zoom_end   = global_zoom_begin + global_zoom_n - 1;
1098
1099    zoom_begin = zoomIndex ? std::distance(itZoomBegin, zoom_index_tmp.begin())
1100                           : global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
1101    zoom_end   = zoomIndex ? std::distance(zoom_index_tmp.begin(), itZoomEnd) - 1 
1102                           : global_zoom_end < end_srv ? global_zoom_end : end_srv ;
1103    zoom_n     = zoom_end - zoom_begin + 1;
1104
1105    if (zoom_n<=0)
1106    {
1107      zoom_begin = 0; zoom_n = 0;
1108    }
1109
1110    if (n_glo == n)
1111    {
1112      zoom_begin = zoomIndex ? std::distance(itZoomBegin, zoom_index_tmp.begin())
1113                             : global_zoom_begin;     
1114      zoom_n     = zoomIndex ? zoom_index_tmp.size() : global_zoom_n;
1115    }
1116  }
1117
1118  /*!
1119    Compare two axis objects.
1120    They are equal if only if they have identical attributes as well as their values.
1121    Moreover, they must have the same transformations.
1122  \param [in] axis Compared axis
1123  \return result of the comparison
1124  */
1125  bool CAxis::isEqual(CAxis* obj)
1126  {
1127    vector<StdString> excludedAttr;
1128    excludedAttr.push_back("axis_ref");
1129
1130    bool objEqual = SuperClass::isEqual(obj, excludedAttr);   
1131    if (!objEqual) return objEqual;
1132
1133    TransMapTypes thisTrans = this->getAllTransformations();
1134    TransMapTypes objTrans  = obj->getAllTransformations();
1135
1136    TransMapTypes::const_iterator it, itb, ite;
1137    std::vector<ETranformationType> thisTransType, objTransType;
1138    for (it = thisTrans.begin(); it != thisTrans.end(); ++it)
1139      thisTransType.push_back(it->first);
1140    for (it = objTrans.begin(); it != objTrans.end(); ++it)
1141      objTransType.push_back(it->first);
1142
1143    if (thisTransType.size() != objTransType.size()) return false;
1144    for (int idx = 0; idx < thisTransType.size(); ++idx)
1145      objEqual &= (thisTransType[idx] == objTransType[idx]);
1146
1147    return objEqual;
1148  }
1149
1150  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
1151  {
1152    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
1153    return transformationMap_.back().second;
1154  }
1155
1156  bool CAxis::hasTransformation()
1157  {
1158    return (!transformationMap_.empty());
1159  }
1160
1161  void CAxis::setTransformations(const TransMapTypes& axisTrans)
1162  {
1163    transformationMap_ = axisTrans;
1164  }
1165
1166  CAxis::TransMapTypes CAxis::getAllTransformations(void)
1167  {
1168    return transformationMap_;
1169  }
1170
1171  void CAxis::duplicateTransformation(CAxis* src)
1172  {
1173    if (src->hasTransformation())
1174    {
1175      this->setTransformations(src->getAllTransformations());
1176    }
1177  }
1178
1179  /*!
1180   * Go through the hierarchy to find the axis from which the transformations must be inherited
1181   */
1182  void CAxis::solveInheritanceTransformation()
1183  {
1184    if (hasTransformation() || !hasDirectAxisReference())
1185      return;
1186
1187    CAxis* axis = this;
1188    std::vector<CAxis*> refAxis;
1189    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1190    {
1191      refAxis.push_back(axis);
1192      axis = axis->getDirectAxisReference();
1193    }
1194
1195    if (axis->hasTransformation())
1196      for (size_t i = 0; i < refAxis.size(); ++i)
1197        refAxis[i]->setTransformations(axis->getAllTransformations());
1198  }
1199
1200  void CAxis::parse(xml::CXMLNode & node)
1201  {
1202    SuperClass::parse(node);
1203
1204    if (node.goToChildElement())
1205    {
1206      StdString nodeElementName;
1207      do
1208      {
1209        StdString nodeId("");
1210        if (node.getAttributes().end() != node.getAttributes().find("id"))
1211        { nodeId = node.getAttributes()["id"]; }
1212
1213        nodeElementName = node.getElementName();
1214        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
1215        it = transformationMapList_.find(nodeElementName);
1216        if (ite != it)
1217        {
1218          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1219                                                                                                               nodeId,
1220                                                                                                               &node)));
1221        }
1222        else
1223        {
1224          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1225                << "The transformation " << nodeElementName << " has not been supported yet.");
1226        }
1227      } while (node.goToNextElement()) ;
1228      node.goToParentElement();
1229    }
1230  }
1231
1232  DEFINE_REF_FUNC(Axis,axis)
1233
1234   ///---------------------------------------------------------------
1235
1236} // namespace xios
Note: See TracBrowser for help on using the repository browser.