source: XIOS/dev/dev_olga/src/node/axis.cpp @ 1144

Last change on this file since 1144 was 1144, checked in by mhnguyen, 7 years ago

Cleaning up some redundant codes

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 39.4 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isClientAfterTransformationChecked(false)
27      , hasBounds_(false), isCompressible_(false)
28      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
29      , transformationMap_(), hasValue(false), doZoomByIndex_(false)
30      , computedWrittenIndex_(false)
31   {
32   }
33
34   CAxis::CAxis(const StdString & id)
35      : CObjectTemplate<CAxis>(id)
36      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
37      , isClientAfterTransformationChecked(false)
38      , hasBounds_(false), isCompressible_(false)
39      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
40      , transformationMap_(), hasValue(false), doZoomByIndex_(false)
41      , computedWrittenIndex_(false)
42   {
43   }
44
45   CAxis::~CAxis(void)
46   { /* Ne rien faire de plus */ }
47
48   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
49   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
50   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
51   {
52     m["zoom_axis"] = TRANS_ZOOM_AXIS;
53     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
54     m["inverse_axis"] = TRANS_INVERSE_AXIS;
55     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
56     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
57   }
58
59   ///---------------------------------------------------------------
60   bool CAxis::IsWritten(const StdString & filename) const
61   {
62      return (this->relFiles.find(filename) != this->relFiles.end());
63   }
64
65   bool CAxis::isWrittenCompressed(const StdString& filename) const
66   {
67      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
68   }
69
70   bool CAxis::isDistributed(void) const
71   {
72      return (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
73             (!this->n.isEmpty() && (this->n != this->n_glo));;
74   }
75
76   /*!
77    * Test whether the data defined on the axis can be outputted in a compressed way.
78    *
79    * \return true if and only if a mask was defined for this axis
80    */
81   bool CAxis::isCompressible(void) const
82   {
83      return isCompressible_;
84   }
85
86   void CAxis::addRelFile(const StdString & filename)
87   {
88      this->relFiles.insert(filename);
89   }
90
91   void CAxis::addRelFileCompressed(const StdString& filename)
92   {
93      this->relFilesCompressed.insert(filename);
94   }
95
96   //----------------------------------------------------------------
97
98   /*!
99     Returns the number of indexes written by each server.
100     \return the number of indexes written by each server
101   */
102   int CAxis::getNumberWrittenIndexes() const
103   {
104     return numberWrittenIndexes_;
105   }
106
107   /*!
108     Returns the total number of indexes written by the servers.
109     \return the total number of indexes written by the servers
110   */
111   int CAxis::getTotalNumberWrittenIndexes() const
112   {
113     return totalNumberWrittenIndexes_;
114   }
115
116   /*!
117     Returns the offset of indexes written by each server.
118     \return the offset of indexes written by each server
119   */
120   int CAxis::getOffsetWrittenIndexes() const
121   {
122     return offsetWrittenIndexes_;
123   }
124
125   //----------------------------------------------------------------
126
127   /*!
128    * Compute the minimum buffer size required to send the attributes to the server(s).
129    *
130    * \return A map associating the server rank with its minimum buffer size.
131    */
132   std::map<int, StdSize> CAxis::getAttributesBufferSize()
133   {
134//     CContextClient* client = CContext::getCurrent()->client;
135     // For now the assumption is that secondary server pools consist of the same number of procs.
136     // CHANGE the line below if the assumption changes.
137     CContext* context = CContext::getCurrent();
138     CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[0] : context->client;
139
140     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
141
142     bool isNonDistributed = (n == n_glo);
143
144     if (client->isServerLeader())
145     {
146       // size estimation for sendServerAttribut
147       size_t size = 6 * sizeof(size_t);
148       // size estimation for sendNonDistributedValue
149       if (isNonDistributed)
150         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
151       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
152
153       const std::list<int>& ranks = client->getRanksServerLeader();
154       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
155       {
156         if (size > attributesSizes[*itRank])
157           attributesSizes[*itRank] = size;
158       }
159     }
160
161     if (!isNonDistributed)
162     {
163       // size estimation for sendDistributedValue
164       boost::unordered_map<int, vector<size_t> >::const_iterator it, ite = indSrv_.end();
165       for (it = indSrv_.begin(); it != ite; ++it)
166       {
167         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
168         if (isCompressible_)
169           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
170
171         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
172         if (hasBounds_)
173           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
174
175         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
176         if (size > attributesSizes[it->first])
177           attributesSizes[it->first] = size;
178       }
179     }
180
181     return attributesSizes;
182   }
183
184   //----------------------------------------------------------------
185
186   StdString CAxis::GetName(void)   { return (StdString("axis")); }
187   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
188   ENodeType CAxis::GetType(void)   { return (eAxis); }
189
190   //----------------------------------------------------------------
191
192   CAxis* CAxis::createAxis()
193   {
194     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
195     return axis;
196   }
197
198   /*!
199     Check common attributes of an axis.
200     This check should be done in the very beginning of work flow
201   */
202   void CAxis::checkAttributes(void)
203   {
204      if (this->n_glo.isEmpty())
205        ERROR("CAxis::checkAttributes(void)",
206              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
207              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
208      StdSize size = this->n_glo.getValue();
209
210      if (!this->index.isEmpty())
211      {
212        if (n.isEmpty()) n = index.numElements();
213
214        // It's not so correct but if begin is not the first value of index
215        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
216        if (begin.isEmpty()) begin = index(0);         
217      }
218      else 
219      {
220        if (!this->begin.isEmpty())
221        {
222          if (begin < 0 || begin > size - 1)
223            ERROR("CAxis::checkAttributes(void)",
224                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
225                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
226        }
227        else this->begin.setValue(0);
228
229        if (!this->n.isEmpty())
230        {
231          if (n < 0 || n > size)
232            ERROR("CAxis::checkAttributes(void)",
233                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
234                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
235        }
236        else this->n.setValue(size);
237
238        {
239          index.resize(n);
240          for (int i = 0; i < n; ++i) index(i) = i+begin;
241        }
242      }
243
244      if (!this->value.isEmpty())
245      {
246        StdSize true_size = value.numElements();
247        if (this->n.getValue() != true_size)
248          ERROR("CAxis::checkAttributes(void)",
249                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
250                << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
251        this->hasValue = true;
252      }
253
254      this->checkData();
255      this->checkZoom();
256      this->checkMask();
257      this->checkBounds();     
258   }
259
260   /*!
261      Check the validity of data and fill in values if any.
262   */
263   void CAxis::checkData()
264   {
265      if (data_begin.isEmpty()) data_begin.setValue(0);
266
267      if (data_n.isEmpty())
268      {
269        data_n.setValue(n);
270      }
271      else if (data_n.getValue() < 0)
272      {
273        ERROR("CAxis::checkData(void)",
274              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
275              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
276      }
277
278      if (data_index.isEmpty())
279      {
280        data_index.resize(data_n);
281        for (int i = 0; i < data_n; ++i) data_index(i) = i;
282      }
283   }
284
285   /*!
286     Check validity of zoom info and fill in values if any.
287   */
288   void CAxis::checkZoom(void)
289   {
290     if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0);
291     if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue());
292     if (zoom_index.isEmpty())
293     {
294       zoom_index.setValue(index.getValue());
295     }
296     if (zoom_n.isEmpty()) zoom_n.setValue(n);
297     if (zoom_begin.isEmpty()) zoom_begin.setValue(begin);
298   }
299
300   /*!
301     Check validity of mask info and fill in values if any.
302   */
303   void CAxis::checkMask()
304   {
305      if (!mask.isEmpty())
306      {
307         if (mask.extent(0) != n)
308           ERROR("CAxis::checkMask(void)",
309                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
310                 << "The mask does not have the same size as the local domain." << std::endl
311                 << "Local size is " << n.getValue() << "." << std::endl
312                 << "Mask size is " << mask.extent(0) << ".");
313      }
314      else // (mask.isEmpty())
315      { // If no mask was defined, we create a default one without any masked point.
316         mask.resize(n);
317         for (int i = 0; i < n; ++i)
318         {
319           mask(i) = true;
320         }
321      }
322   }
323
324   /*!
325     Check validity of bounds info and fill in values if any.
326   */
327   void CAxis::checkBounds()
328   {
329     if (!bounds.isEmpty())
330     {
331       if (bounds.extent(0) != 2 || bounds.extent(1) != n)
332         ERROR("CAxis::checkAttributes(void)",
333               << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
334               << "Axis size is " << n.getValue() << "." << std::endl
335               << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
336       hasBounds_ = true;
337     }
338     else hasBounds_ = false;
339   }
340
341   void CAxis::checkEligibilityForCompressedOutput()
342   {
343     // We don't check if the mask is valid here, just if a mask has been defined at this point.
344     isCompressible_ = !mask.isEmpty();
345   }
346
347   bool CAxis::dispatchEvent(CEventServer& event)
348   {
349      if (SuperClass::dispatchEvent(event)) return true;
350      else
351      {
352        switch(event.type)
353        {
354           case EVENT_ID_DISTRIBUTION_ATTRIBUTE :
355             recvDistributionAttribute(event);
356             return true;
357             break;
358          case EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES:
359            recvNonDistributedAttributes(event);
360            return true;
361            break;
362          case EVENT_ID_DISTRIBUTED_ATTRIBUTES:
363            recvDistributedAttributes(event);
364            return true;
365            break;
366           default :
367             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
368                    << "Unknown Event");
369           return false;
370         }
371      }
372   }
373
374   /*!
375     Check attributes on client side (This name is still adequate???)
376   */
377   void CAxis::checkAttributesOnClient()
378   {
379     if (this->areClientAttributesChecked_) return;
380
381     this->checkAttributes();
382
383     this->areClientAttributesChecked_ = true;
384   }
385
386   /*
387     The (spatial) transformation sometimes can change attributes of an axis. Therefore, we should recheck them.
388   */
389   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
390                                                          CServerDistributionDescription::ServerDistributionType distType)
391   {
392     CContext* context=CContext::getCurrent() ;
393
394     if (this->isClientAfterTransformationChecked) return;
395     if (context->hasClient)
396     {
397       if (index.numElements() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);
398     }
399
400     this->isClientAfterTransformationChecked = true;
401   }
402
403   // Send all checked attributes to server
404   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
405                                     CServerDistributionDescription::ServerDistributionType distType)
406   {
407     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
408     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
409     CContext* context = CContext::getCurrent();
410
411     if (this->isChecked) return;
412     if (context->hasClient) sendAttributes(globalDim, orderPositionInGrid, distType);   
413
414     this->isChecked = true;
415   }
416
417  /*!
418    Send attributes from one client to other clients
419    \param[in] globalDim global dimension of grid which contains this axis
420    \param[in] order
421  */
422  void CAxis::sendAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
423                             CServerDistributionDescription::ServerDistributionType distType)
424  {
425     if (index.numElements() == n_glo.getValue())
426       sendNonDistributedAttributes();
427     else
428     {
429       sendDistributedAttributes();       
430     }
431     sendDistributionAttribute(globalDim, orderPositionInGrid, distType);
432  }
433
434  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
435                                     CServerDistributionDescription::ServerDistributionType distType)
436  {
437    CContext* context = CContext::getCurrent();
438
439    // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1;
440    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
441    for (int p = 0; p < nbSrvPools; ++p)
442    {
443      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
444      int nbServer = client->serverSize;
445      int range, clientSize = client->clientSize;
446      int rank = client->clientRank;
447
448      size_t ni = this->n.getValue();
449      size_t ibegin = this->begin.getValue();
450      size_t global_zoom_end = global_zoom_begin+global_zoom_n-1;
451      size_t nZoomCount = 0;
452      size_t nbIndex = index.numElements();
453
454      if (doZoomByIndex_) 
455      {
456        nZoomCount = zoom_index.numElements();
457      }
458      else
459      {
460        for (size_t idx = 0; idx < nbIndex; ++idx)
461        {
462          globalLocalIndexMap_[index(idx)] = idx;
463          size_t globalIndex = index(idx);
464          if (globalIndex >= global_zoom_begin && globalIndex <= global_zoom_end) ++nZoomCount;
465        }
466      }
467
468
469      CArray<size_t,1> globalIndexAxis(nbIndex);
470      std::vector<size_t> globalAxisZoom(nZoomCount);
471      nZoomCount = 0;
472      if (doZoomByIndex_) 
473      {
474        int nbIndexZoom = zoom_index.numElements();       
475        for (int i = 0; i < nbIndexZoom; ++i)
476        {   
477          globalIndexAxis(i) = zoom_index(i);
478        }
479      }
480      else 
481      {
482        for (size_t idx = 0; idx < nbIndex; ++idx)
483        {
484          size_t globalIndex = index(idx);
485          globalIndexAxis(idx) = globalIndex;
486          if (globalIndex >= global_zoom_begin && globalIndex <= global_zoom_end)
487          {
488            globalAxisZoom[nZoomCount] = globalIndex;
489            ++nZoomCount;
490          }
491        }
492
493        int end       = begin + n -1;       
494        zoom_begin    = global_zoom_begin > begin ? global_zoom_begin : begin;
495        int zoom_end  = global_zoom_end < end ? zoom_end : end;
496        zoom_n        = zoom_end-zoom_begin+1;
497      }
498
499      std::set<int> writtenInd;
500      if (isCompressible_)
501      {
502        for (int idx = 0; idx < data_index.numElements(); ++idx)
503        {
504          int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
505
506          if (ind >= 0 && ind < ni && mask(ind))
507          {
508            ind += ibegin;
509            if (ind >= global_zoom_begin && ind <= global_zoom_end)
510              writtenInd.insert(ind);
511          }
512        }
513      }
514
515      CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer, distType);
516      int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
517      CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer;
518      if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
519      {
520        std::vector<int> nGlobAxis(1);
521        nGlobAxis[0] = n_glo.getValue();
522
523        size_t globalSizeIndex = 1, indexBegin, indexEnd;
524        for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
525        indexBegin = 0;
526        if (globalSizeIndex <= clientSize)
527        {
528          indexBegin = rank%globalSizeIndex;
529          indexEnd = indexBegin;
530        }
531        else
532        {
533          for (int i = 0; i < clientSize; ++i)
534          {
535            range = globalSizeIndex / clientSize;
536            if (i < (globalSizeIndex%clientSize)) ++range;
537            if (i == client->clientRank) break;
538            indexBegin += range;
539          }
540          indexEnd = indexBegin + range - 1;
541        }
542
543        CArray<size_t,1> globalIndex(index.numElements());
544        for (size_t idx = 0; idx < globalIndex.numElements(); ++idx)
545          globalIndex(idx) = index(idx);
546
547        CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
548        serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
549        CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
550        clientServerMap->computeServerIndexMapping(globalIndex);
551        globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
552        delete clientServerMap;
553      }
554      else
555      {
556        std::vector<size_t> globalIndexServer(n_glo.getValue());
557        for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
558        {
559          globalIndexServer[idx] = idx;
560        }
561
562        for (int idx = 0; idx < nbServer; ++idx)
563        {
564          globalIndexAxisOnServer[idx] = globalIndexServer;
565        }
566      }
567
568      indSrv_.swap(globalIndexAxisOnServer);
569
570      CClientServerMapping::GlobalIndexMap::const_iterator it = indSrv_.begin(),
571                                                           ite = indSrv_.end();
572
573      connectedServerRank_.clear();
574      for (it = indSrv_.begin(); it != ite; ++it) {
575        connectedServerRank_.push_back(it->first);
576      }
577
578      nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
579    }
580  }
581
582   void CAxis::computeWrittenIndex()
583   { 
584      if (computedWrittenIndex_) return;
585      computedWrittenIndex_ = true;
586
587      CContext* context=CContext::getCurrent();     
588      CContextServer* server = context->server; 
589
590      std::vector<int> nBegin(1), nSize(1), nBeginGlobal(1), nGlob(1);
591      nBegin[0]       = zoom_begin;
592      nSize[0]        = zoom_n;   
593      nBeginGlobal[0] = 0; 
594      nGlob[0]        = n_glo;
595      CDistributionServer srvDist(server->intraCommSize, nBegin, nSize, nBeginGlobal, nGlob); 
596      const CArray<size_t,1>& writtenGlobalIndex  = srvDist.getGlobalIndex();
597
598      size_t nbWritten = 0, indGlo;     
599      boost::unordered_map<size_t,size_t>::const_iterator itb = globalLocalIndexMap_.begin(),
600                                                          ite = globalLocalIndexMap_.end(), it;         
601      CArray<size_t,1>::const_iterator itSrvb = writtenGlobalIndex.begin(),
602                                       itSrve = writtenGlobalIndex.end(), itSrv;     
603
604      for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
605      {
606        indGlo = *itSrv;
607        if (ite != globalLocalIndexMap_.find(indGlo))
608        {         
609          ++nbWritten;
610        }                 
611      }
612
613      localIndexToWriteOnServer.resize(nbWritten);
614
615      nbWritten = 0;
616      for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
617      {
618        indGlo = *itSrv;
619        if (ite != globalLocalIndexMap_.find(indGlo))
620        {
621          localIndexToWriteOnServer(nbWritten) = globalLocalIndexMap_[indGlo];
622          ++nbWritten;
623        }                 
624      }
625
626      if (isCompressible())
627      {
628        nbWritten = 0;
629        boost::unordered_map<size_t,size_t> localGlobalIndexMap;
630        for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
631        {
632          indGlo = *itSrv;
633          if (ite != globalLocalIndexMap_.find(indGlo))
634          {
635            localGlobalIndexMap[localIndexToWriteOnServer(nbWritten)] = indGlo;
636            ++nbWritten;
637          }                 
638        }
639
640        nbWritten = 0;
641        for (int idx = 0; idx < data_index.numElements(); ++idx)
642        {
643          if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
644          {
645            ++nbWritten;
646          }
647        }
648
649        compressedIndexToWriteOnServer.resize(nbWritten);
650        nbWritten = 0;
651        for (int idx = 0; idx < data_index.numElements(); ++idx)
652        {
653          if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
654          {
655            compressedIndexToWriteOnServer(nbWritten) = localGlobalIndexMap[data_index(idx)];
656            ++nbWritten;
657          }
658        }
659
660        numberWrittenIndexes_ = nbWritten;
661        if (isDistributed())
662        {
663               
664          MPI_Allreduce(&numberWrittenIndexes_, &totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
665          MPI_Scan(&numberWrittenIndexes_, &offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
666          offsetWrittenIndexes_ -= numberWrittenIndexes_;
667        }
668        else
669          totalNumberWrittenIndexes_ = numberWrittenIndexes_;
670      }
671
672   }
673
674
675
676  void CAxis::sendDistributionAttribute(const std::vector<int>& globalDim, int orderPositionInGrid,
677                                        CServerDistributionDescription::ServerDistributionType distType)
678  {
679    CContext* context = CContext::getCurrent();
680
681    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 0) : 1;
682    for (int i = 0; i < nbSrvPools; ++i)
683    {
684      CContextClient* contextClientTmp = (context->hasServer) ? context->clientPrimServer[i]
685                                                                         : context->client;
686      int nbServer = contextClientTmp->serverSize;
687
688      CServerDistributionDescription serverDescription(globalDim, nbServer);
689      serverDescription.computeServerDistribution();
690
691      std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
692      std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
693
694      globalDimGrid.resize(globalDim.size());
695      for (int idx = 0; idx < globalDim.size(); ++idx) globalDimGrid(idx) = globalDim[idx];
696
697      CEventClient event(getType(),EVENT_ID_DISTRIBUTION_ATTRIBUTE);
698      if (contextClientTmp->isServerLeader())
699      {
700        std::list<CMessage> msgs;
701
702        const std::list<int>& ranks = contextClientTmp->getRanksServerLeader();
703        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
704        {
705          // Use const int to ensure CMessage holds a copy of the value instead of just a reference
706          const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
707          const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
708          const int end   = begin + ni - 1;
709
710          msgs.push_back(CMessage());
711          CMessage& msg = msgs.back();
712          msg << this->getId();
713          msg << ni << begin << end;
714          msg << global_zoom_begin.getValue() << global_zoom_n.getValue();
715          msg << isCompressible_;
716          msg << orderPositionInGrid;
717          msg << globalDimGrid;
718
719          event.push(*itRank,1,msg);
720        }
721        contextClientTmp->sendEvent(event);
722      }
723      else contextClientTmp->sendEvent(event);
724    }
725  }
726
727  void CAxis::sendNonDistributedAttributes()
728  {
729    CContext* context = CContext::getCurrent();
730
731    // int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1;
732    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
733    for (int p = 0; p < nbSrvPools; ++p)
734    {
735      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
736
737      CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES);
738      size_t nbIndex = index.numElements();
739      size_t nbDataIndex = 0;
740
741      for (int idx = 0; idx < data_index.numElements(); ++idx)
742      {
743        int ind = data_index(idx);
744        if (ind >= 0 && ind < nbIndex) ++nbDataIndex;
745      }
746
747      CArray<int,1> dataIndex(nbDataIndex);
748      nbDataIndex = 0;
749      for (int idx = 0; idx < data_index.numElements(); ++idx)
750      {
751        int ind = data_index(idx);
752        if (ind >= 0 && ind < nbIndex)
753        {
754          dataIndex(nbDataIndex) = ind;
755          ++nbDataIndex;
756        }
757      }
758
759      if (client->isServerLeader())
760      {
761        std::list<CMessage> msgs;
762
763        const std::list<int>& ranks = client->getRanksServerLeader();
764        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
765        {
766          msgs.push_back(CMessage());
767          CMessage& msg = msgs.back();
768          msg << this->getId();
769          msg << index.getValue() << dataIndex << mask.getValue();
770
771          msg << doZoomByIndex_;
772          if (doZoomByIndex_) msg << zoom_index.getValue();
773          msg << hasValue;
774          if (hasValue) msg << value.getValue();
775          msg << hasBounds_;
776          if (hasBounds_) msg << bounds.getValue();
777
778          event.push(*itRank, 1, msg);
779        }
780        client->sendEvent(event);
781      }
782      else client->sendEvent(event);
783    }
784  }
785
786  void CAxis::recvNonDistributedAttributes(CEventServer& event)
787  {
788    list<CEventServer::SSubEvent>::iterator it;
789    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
790    {
791      CBufferIn* buffer = it->buffer;
792      string axisId;
793      *buffer >> axisId;
794      get(axisId)->recvNonDistributedAttributes(it->rank, *buffer);
795    }
796  }
797
798  void CAxis::recvNonDistributedAttributes(int rank, CBufferIn& buffer)
799  { 
800    CArray<int,1> tmp_index, tmp_data_index, tmp_zoom_index;
801    CArray<bool,1> tmp_mask;
802    CArray<double,1> tmp_val;
803    CArray<double,2> tmp_bnds;
804
805    buffer >> tmp_index;
806    index.reference(tmp_index);
807    buffer >> tmp_data_index;
808    data_index.reference(tmp_data_index);
809    buffer >> tmp_mask;
810    mask.reference(tmp_mask);
811
812    buffer >> doZoomByIndex_;
813    if (doZoomByIndex_)
814    {
815      buffer >> tmp_zoom_index;
816      zoom_index.reference(tmp_zoom_index);
817    }
818
819    buffer >> hasValue;
820    if (hasValue)
821    {
822      buffer >> tmp_val;
823      value.reference(tmp_val);
824    }
825
826    buffer >> hasBounds_;
827    if (hasBounds_)
828    {
829      buffer >> tmp_bnds;
830      bounds.reference(tmp_bnds);
831    }
832
833    data_begin.setValue(0);
834    globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor()));
835    for (int idx = 0; idx < index.numElements(); ++idx) globalLocalIndexMap_[idx] = index(idx);
836  }
837
838  void CAxis::sendDistributedAttributes(void)
839  {
840    int ns, n, i, j, ind, nv, idx;
841    CContext* context = CContext::getCurrent();
842
843    //int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1;
844    int nbSrvPools = (context->hasServer) ? (context->hasClient ? context->clientPrimServer.size() : 1) : 1;
845    for (int p = 0; p < nbSrvPools; ++p)
846    {
847      CContextClient* client = (0 != context->clientPrimServer.size()) ? context->clientPrimServer[p] : context->client;
848
849      CEventClient eventData(getType(), EVENT_ID_DISTRIBUTED_ATTRIBUTES);
850
851      list<CMessage> listData;
852      list<CArray<int,1> > list_indi, list_dataInd, list_zoomInd;
853      list<CArray<bool,1> > list_mask;
854      list<CArray<double,1> > list_val;
855      list<CArray<double,2> > list_bounds;
856
857      int nbIndex = index.numElements();
858      CArray<int,1> dataIndex(nbIndex);
859      dataIndex = -1;
860      for (int inx = 0; inx < data_index.numElements(); ++inx)
861      {
862        if (0 <= data_index(inx) && data_index(inx) < nbIndex)
863          dataIndex(inx) = data_index(inx);
864      }
865
866      boost::unordered_map<int, std::vector<size_t> >::const_iterator it, iteMap;
867      iteMap = indSrv_.end();
868      for (int k = 0; k < connectedServerRank_.size(); ++k)
869      {
870        int nbData = 0;
871        int rank = connectedServerRank_[k];
872        int nbSendingClient = nbConnectedClients_[rank];
873        it = indSrv_.find(rank);
874        if (iteMap != it)
875          nbData = it->second.size();
876
877        list_indi.push_back(CArray<int,1>(nbData));
878        list_dataInd.push_back(CArray<int,1>(nbData));       
879        list_mask.push_back(CArray<bool,1>(nbData));
880
881        if (doZoomByIndex_)
882          list_zoomInd.push_back(CArray<int,1>(nbData));
883
884        if (hasValue)
885          list_val.push_back(CArray<double,1>(nbData));
886
887        if (hasBounds_)
888        {
889          list_bounds.push_back(CArray<double,2>(2,nbData));
890        }
891
892        CArray<int,1>& indi = list_indi.back();
893        CArray<int,1>& dataIndi = list_dataInd.back();       
894        CArray<bool,1>& maskIndi = list_mask.back();
895
896        for (n = 0; n < nbData; ++n)
897        {
898          idx = static_cast<int>(it->second[n]);
899          indi(n) = idx;
900
901          ind = globalLocalIndexMap_[idx];
902          dataIndi(n) = dataIndex(ind);
903          maskIndi(n) = mask(ind);
904
905          if (doZoomByIndex_)
906          {
907            CArray<int,1>& zoomIndi = list_zoomInd.back();
908            zoomIndi(n) = zoom_index(ind);
909          }
910
911          if (hasValue)
912          {
913            CArray<double,1>& val = list_val.back();
914            val(n) = value(ind);
915          }
916
917          if (hasBounds_)
918          {
919            CArray<double,2>& boundsVal = list_bounds.back();
920            boundsVal(0, n) = bounds(0,n);
921            boundsVal(1, n) = bounds(1,n);
922          }
923        }
924
925        listData.push_back(CMessage());
926        listData.back() << this->getId()
927                        << list_indi.back() << list_dataInd.back() << list_mask.back();
928
929        listData.back() << doZoomByIndex_;           
930        if (doZoomByIndex_)
931          listData.back() << list_zoomInd.back();
932
933        listData.back() << hasValue;
934        if (hasValue)
935          listData.back() << list_val.back();
936
937        listData.back() << hasBounds_;
938        if (hasBounds_)
939          listData.back() << list_bounds.back();
940
941        eventData.push(rank, nbConnectedClients_[rank], listData.back());
942      }
943
944      client->sendEvent(eventData);
945    }
946  }
947
948  void CAxis::recvDistributedAttributes(CEventServer& event)
949  {
950    string axisId;
951    vector<int> ranks;
952    vector<CBufferIn*> buffers;
953
954    list<CEventServer::SSubEvent>::iterator it;
955    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
956    {
957      ranks.push_back(it->rank);
958      CBufferIn* buffer = it->buffer;
959      *buffer >> axisId;
960      buffers.push_back(buffer);
961    }
962    get(axisId)->recvDistributedAttributes(ranks, buffers);
963  }
964
965
966  void CAxis::recvDistributedAttributes(vector<int>& ranks, vector<CBufferIn*> buffers)
967  {
968    int nbReceived = ranks.size();
969    vector<CArray<int,1> > vec_indi(nbReceived), vec_dataInd(nbReceived), vec_zoomInd(nbReceived);   
970    vector<CArray<bool,1> > vec_mask(nbReceived);
971    vector<CArray<double,1> > vec_val(nbReceived);
972    vector<CArray<double,2> > vec_bounds(nbReceived);
973   
974    for (int idx = 0; idx < nbReceived; ++idx)
975    {     
976      CBufferIn& buffer = *buffers[idx];
977      buffer >> vec_indi[idx];
978      buffer >> vec_dataInd[idx];     
979      buffer >> vec_mask[idx];
980
981      buffer >> doZoomByIndex_;
982      if (doZoomByIndex_)
983        buffer >> vec_zoomInd[idx];
984
985      buffer >> hasValue;
986      if (hasValue)
987        buffer >> vec_val[idx];
988
989      buffer >> hasBounds_;
990      if (hasBounds_)
991        buffer >> vec_bounds[idx];
992    }
993
994    int nbData = 0;
995    for (int idx = 0; idx < nbReceived; ++idx)
996    {
997      nbData += vec_indi[idx].numElements();
998    }
999
1000    index.resize(nbData);
1001    globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor()));
1002    CArray<int,1> nonCompressedData(nbData);   
1003    mask.resize(nbData);
1004    if (hasValue)
1005      value.resize(nbData);
1006    if (hasBounds_)
1007      bounds.resize(2,nbData);
1008
1009    nbData = 0;
1010    for (int idx = 0; idx < nbReceived; ++idx)
1011    {
1012      CArray<int,1>& indi = vec_indi[idx];
1013      CArray<int,1>& dataIndi = vec_dataInd[idx];
1014      CArray<bool,1>& maskIndi = vec_mask[idx];
1015      int nb = indi.numElements();
1016      for (int n = 0; n < nb; ++n)
1017      {
1018        index(nbData) = indi(n);
1019        globalLocalIndexMap_[indi(n)] = nbData;
1020        nonCompressedData(nbData) = (0 <= dataIndi(n)) ? nbData : -1;
1021        mask(nbData) = maskIndi(n);
1022        if (hasValue)
1023          value(nbData) = vec_val[idx](n);
1024        if (hasBounds_)
1025        {
1026          bounds(0,nbData) = vec_bounds[idx](0,n);
1027          bounds(1,nbData) = vec_bounds[idx](1,n);
1028        }
1029        ++nbData;
1030      }
1031    }
1032
1033    int nbIndex = index.numElements();
1034    int nbCompressedData = 0; 
1035    for (int idx = 0; idx < nonCompressedData.numElements(); ++idx)
1036    {
1037      if (0 <= nonCompressedData(idx) && nonCompressedData(idx) < nbIndex)
1038        ++nbCompressedData;       
1039    }
1040
1041    data_index.resize(nbCompressedData);
1042    nbCompressedData = 0;
1043    for (int idx = 0; idx < nonCompressedData.numElements(); ++idx)
1044    {
1045      if (0 <= nonCompressedData(idx) && nonCompressedData(idx) < nbIndex)
1046      {
1047        data_index(nbCompressedData) = nonCompressedData(idx);
1048        ++nbCompressedData;       
1049      }
1050    }
1051    data_begin.setValue(0);
1052
1053    if (doZoomByIndex_)
1054    {
1055      int nbZoomIndex = 0;
1056      for (int idx = 0; idx < nbReceived; ++idx)
1057      {
1058        nbZoomIndex += vec_zoomInd[idx].numElements();
1059      }
1060
1061      zoom_index.resize(nbZoomIndex);
1062      nbZoomIndex = 0;     
1063      for (int idx = 0; idx < nbReceived; ++idx)
1064      {     
1065        CArray<int,1>& tmp = vec_zoomInd[idx];
1066        for (int i = 0; i < tmp.size(); ++i)
1067        {
1068          zoom_index(nbZoomIndex) = tmp(i);
1069          ++nbZoomIndex;
1070        }       
1071      }
1072    }
1073  }
1074
1075  void CAxis::recvDistributionAttribute(CEventServer& event)
1076  {
1077    CBufferIn* buffer = event.subEvents.begin()->buffer;
1078    string axisId;
1079    *buffer >> axisId;
1080    get(axisId)->recvDistributionAttribute(*buffer);
1081  }
1082
1083  void CAxis::recvDistributionAttribute(CBufferIn& buffer)
1084  {
1085    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp;
1086
1087    buffer >> ni_srv >> begin_srv >> end_srv;
1088    buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp;
1089    buffer >> isCompressible_;
1090    buffer >> orderPosInGrid;
1091    buffer >> globalDimGrid;
1092
1093    n.setValue(ni_srv);
1094    begin.setValue(begin_srv);
1095    global_zoom_begin = global_zoom_begin_tmp;
1096    global_zoom_n  = global_zoom_n_tmp;
1097    int global_zoom_end = global_zoom_begin + global_zoom_n - 1;
1098
1099    zoom_begin = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
1100    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
1101    zoom_n  = zoom_end_srv - zoom_begin_srv + 1;
1102
1103    if (zoom_n<=0)
1104    {
1105      zoom_begin = 0; zoom_end_srv = 0; zoom_n = 0;
1106    }
1107
1108    if (n_glo == n)
1109    {
1110      zoom_begin = global_zoom_begin;
1111      zoom_end_srv   = global_zoom_end; //zoom_end;
1112      zoom_n     = zoom_end_srv - zoom_begin + 1;
1113    }
1114  }
1115
1116
1117  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
1118  {
1119    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
1120    return transformationMap_.back().second;
1121  }
1122
1123  bool CAxis::hasTransformation()
1124  {
1125    return (!transformationMap_.empty());
1126  }
1127
1128  void CAxis::setTransformations(const TransMapTypes& axisTrans)
1129  {
1130    transformationMap_ = axisTrans;
1131  }
1132
1133  CAxis::TransMapTypes CAxis::getAllTransformations(void)
1134  {
1135    return transformationMap_;
1136  }
1137
1138  void CAxis::duplicateTransformation(CAxis* src)
1139  {
1140    if (src->hasTransformation())
1141    {
1142      this->setTransformations(src->getAllTransformations());
1143    }
1144  }
1145
1146  /*!
1147   * Go through the hierarchy to find the axis from which the transformations must be inherited
1148   */
1149  void CAxis::solveInheritanceTransformation()
1150  {
1151    if (hasTransformation() || !hasDirectAxisReference())
1152      return;
1153
1154    CAxis* axis = this;
1155    std::vector<CAxis*> refAxis;
1156    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1157    {
1158      refAxis.push_back(axis);
1159      axis = axis->getDirectAxisReference();
1160    }
1161
1162    if (axis->hasTransformation())
1163      for (size_t i = 0; i < refAxis.size(); ++i)
1164        refAxis[i]->setTransformations(axis->getAllTransformations());
1165  }
1166
1167  void CAxis::parse(xml::CXMLNode & node)
1168  {
1169    SuperClass::parse(node);
1170
1171    if (node.goToChildElement())
1172    {
1173      StdString nodeElementName;
1174      do
1175      {
1176        StdString nodeId("");
1177        if (node.getAttributes().end() != node.getAttributes().find("id"))
1178        { nodeId = node.getAttributes()["id"]; }
1179
1180        nodeElementName = node.getElementName();
1181        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
1182        it = transformationMapList_.find(nodeElementName);
1183        if (ite != it)
1184        {
1185          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1186                                                                                                               nodeId,
1187                                                                                                               &node)));
1188        }
1189        else
1190        {
1191          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1192                << "The transformation " << nodeElementName << " has not been supported yet.");
1193        }
1194      } while (node.goToNextElement()) ;
1195      node.goToParentElement();
1196    }
1197  }
1198
1199  DEFINE_REF_FUNC(Axis,axis)
1200
1201   ///---------------------------------------------------------------
1202
1203} // namespace xios
Note: See TracBrowser for help on using the repository browser.