source: XIOS/trunk/src/node/axis.cpp @ 815

Last change on this file since 815 was 815, checked in by mhnguyen, 8 years ago

Fixing the bug in ticket 72

+) The distributed axis on client side send info to correct corresponding server
+) Improve serverdistributiondescription class to make it more flexible
+) Create new test_basic_2D only for test cases of 2-d grid

Test
+) On Curie
+) All tests pass

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 30.0 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "inverse_axis.hpp"
13#include "zoom_axis.hpp"
14#include "interpolate_axis.hpp"
15#include "server_distribution_description.hpp"
16#include "client_server_mapping_distributed.hpp"
17#include "distribution_client.hpp"
18
19namespace xios {
20
21   /// ////////////////////// Définitions ////////////////////// ///
22
23   CAxis::CAxis(void)
24      : CObjectTemplate<CAxis>()
25      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
26      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
27      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
28      , transformationMap_(), global_zoom_begin(0), global_zoom_size(0)
29   {
30   }
31
32   CAxis::CAxis(const StdString & id)
33      : CObjectTemplate<CAxis>(id)
34      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
35      , isDistributed_(false), hasBounds_(false), isCompressible_(false)
36      , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)
37      , transformationMap_(), global_zoom_begin(0), global_zoom_size(0)
38   {
39   }
40
41   CAxis::~CAxis(void)
42   { /* Ne rien faire de plus */ }
43
44   ///---------------------------------------------------------------
45
46   const std::set<StdString> & CAxis::getRelFiles(void) const
47   {
48      return (this->relFiles);
49   }
50
51   bool CAxis::IsWritten(const StdString & filename) const
52   {
53      return (this->relFiles.find(filename) != this->relFiles.end());
54   }
55
56   bool CAxis::isWrittenCompressed(const StdString& filename) const
57   {
58      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
59   }
60
61   bool CAxis::isDistributed(void) const
62   {
63      return isDistributed_;
64   }
65
66   /*!
67    * Test whether the data defined on the axis can be outputted in a compressed way.
68    *
69    * \return true if and only if a mask was defined for this axis
70    */
71   bool CAxis::isCompressible(void) const
72   {
73      return isCompressible_;
74   }
75
76   void CAxis::addRelFile(const StdString & filename)
77   {
78      this->relFiles.insert(filename);
79   }
80
81   void CAxis::addRelFileCompressed(const StdString& filename)
82   {
83      this->relFilesCompressed.insert(filename);
84   }
85
86   //----------------------------------------------------------------
87
88   const std::vector<int>& CAxis::getIndexesToWrite(void) const
89   {
90     return indexesToWrite;
91   }
92
93   /*!
94     Returns the number of indexes written by each server.
95     \return the number of indexes written by each server
96   */
97   int CAxis::getNumberWrittenIndexes() const
98   {
99     return numberWrittenIndexes_;
100   }
101
102   /*!
103     Returns the total number of indexes written by the servers.
104     \return the total number of indexes written by the servers
105   */
106   int CAxis::getTotalNumberWrittenIndexes() const
107   {
108     return totalNumberWrittenIndexes_;
109   }
110
111   /*!
112     Returns the offset of indexes written by each server.
113     \return the offset of indexes written by each server
114   */
115   int CAxis::getOffsetWrittenIndexes() const
116   {
117     return offsetWrittenIndexes_;
118   }
119
120   //----------------------------------------------------------------
121
122   /*!
123    * Compute the minimum buffer size required to send the attributes to the server(s).
124    *
125    * \return A map associating the server rank with its minimum buffer size.
126    */
127   std::map<int, StdSize> CAxis::getAttributesBufferSize()
128   {
129     CContextClient* client = CContext::getCurrent()->client;
130
131     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes();
132
133     bool isNonDistributed = (n == n_glo);
134
135     if (client->isServerLeader())
136     {
137       // size estimation for sendServerAttribut
138       size_t size = 6 * sizeof(size_t);
139       // size estimation for sendNonDistributedValue
140       if (isNonDistributed)
141         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
142       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
143
144       const std::list<int>& ranks = client->getRanksServerLeader();
145       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
146       {
147         if (size > attributesSizes[*itRank])
148           attributesSizes[*itRank] = size;
149       }
150     }
151
152     if (!isNonDistributed)
153     {
154       // size estimation for sendDistributedValue
155       std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();
156       for (it = indSrv_.begin(); it != ite; ++it)
157       {
158         size_t sizeIndexEvent = CArray<int,1>::size(it->second.size());
159         if (isCompressible_)
160           sizeIndexEvent += CArray<int,1>::size(indWrittenSrv_[it->first].size());
161
162         size_t sizeValEvent = CArray<double,1>::size(it->second.size());
163         if (hasBounds_)
164           sizeValEvent += CArray<double,2>::size(2 * it->second.size());
165
166         size_t size = CEventClient::headerSize + getId().size() + sizeof(size_t) + std::max(sizeIndexEvent, sizeValEvent);
167         if (size > attributesSizes[it->first])
168           attributesSizes[it->first] = size;
169       }
170     }
171
172     return attributesSizes;
173   }
174
175   //----------------------------------------------------------------
176
177   StdString CAxis::GetName(void)   { return (StdString("axis")); }
178   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
179   ENodeType CAxis::GetType(void)   { return (eAxis); }
180
181   //----------------------------------------------------------------
182
183   CAxis* CAxis::createAxis()
184   {
185     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
186     return axis;
187   }
188
189   void CAxis::fillInValues(const CArray<double,1>& values)
190   {
191     this->value = values;
192   }
193
194   void CAxis::checkAttributes(void)
195   {
196      if (this->n_glo.isEmpty())
197        ERROR("CAxis::checkAttributes(void)",
198              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
199              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
200      StdSize size = this->n_glo.getValue();
201
202      isDistributed_ = !this->begin.isEmpty() || !this->n.isEmpty();
203
204      if (!this->begin.isEmpty())
205      {
206        if (begin < 0 || begin > size - 1)
207          ERROR("CAxis::checkAttributes(void)",
208                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
209                << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
210      }
211      else this->begin.setValue(0);
212
213      if (!this->n.isEmpty())
214      {
215        if (n < 0 || n > size)
216          ERROR("CAxis::checkAttributes(void)",
217                << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
218                << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
219      }
220      else this->n.setValue(size);
221
222      StdSize true_size = value.numElements();
223      if (this->n.getValue() != true_size)
224        ERROR("CAxis::checkAttributes(void)",
225              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
226              << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
227
228      this->checkData();
229      this->checkZoom();
230      this->checkMask();
231      this->checkBounds();
232   }
233
234   void CAxis::checkData()
235   {
236      if (data_begin.isEmpty()) data_begin.setValue(0);
237
238      if (data_n.isEmpty())
239      {
240        data_n.setValue(n);
241      }
242      else if (data_n.getValue() < 0)
243      {
244        ERROR("CAxis::checkData(void)",
245              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
246              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
247      }
248
249      if (data_index.isEmpty())
250      {
251        data_index.resize(data_n);
252        for (int i = 0; i < data_n; ++i) data_index(i) = i;
253      }
254   }
255
256   void CAxis::checkZoom(void)
257   {
258     if (0 == global_zoom_size) global_zoom_size = this->n_glo.getValue();
259   }
260
261   void CAxis::checkMask()
262   {
263      if (!mask.isEmpty())
264      {
265         if (mask.extent(0) != n)
266           ERROR("CAxis::checkMask(void)",
267                 << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
268                 << "The mask does not have the same size as the local domain." << std::endl
269                 << "Local size is " << n.getValue() << "." << std::endl
270                 << "Mask size is " << mask.extent(0) << ".");
271      }
272      else // (mask.isEmpty())
273      { // If no mask was defined, we create a default one without any masked point.
274         mask.resize(n);
275         for (int i = 0; i < n; ++i)
276         {
277           mask(i) = true;
278         }
279      }
280   }
281
282  void CAxis::checkBounds()
283  {
284    if (!bounds.isEmpty())
285    {
286      if (bounds.extent(0) != 2 || bounds.extent(1) != n)
287        ERROR("CAxis::checkAttributes(void)",
288              << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
289              << "Axis size is " << n.getValue() << "." << std::endl
290              << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
291      hasBounds_ = true;
292    }
293    else hasBounds_ = false;
294  }
295
296  void CAxis::checkEligibilityForCompressedOutput()
297  {
298    // We don't check if the mask is valid here, just if a mask has been defined at this point.
299    isCompressible_ = !mask.isEmpty();
300  }
301
302  bool CAxis::dispatchEvent(CEventServer& event)
303   {
304      if (SuperClass::dispatchEvent(event)) return true;
305      else
306      {
307        switch(event.type)
308        {
309           case EVENT_ID_SERVER_ATTRIBUT :
310             recvServerAttribut(event);
311             return true;
312             break;
313           case EVENT_ID_INDEX:
314            recvIndex(event);
315            return true;
316            break;
317          case EVENT_ID_DISTRIBUTED_VALUE:
318            recvDistributedValue(event);
319            return true;
320            break;
321          case EVENT_ID_NON_DISTRIBUTED_VALUE:
322            recvNonDistributedValue(event);
323            return true;
324            break;
325           default :
326             ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
327                    << "Unknown Event");
328           return false;
329         }
330      }
331   }
332
333   void CAxis::checkAttributesOnClient()
334   {
335     if (this->areClientAttributesChecked_) return;
336
337     this->checkAttributes();
338
339     this->areClientAttributesChecked_ = true;
340   }
341
342   // Send all checked attributes to server
343   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
344                                     CServerDistributionDescription::ServerDistributionType distType)
345   {
346     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
347     CContext* context = CContext::getCurrent();
348
349     if (this->isChecked) return;
350     if (context->hasClient)
351     {
352       sendServerAttribut(globalDim, orderPositionInGrid, distType);
353       sendValue(globalDim, orderPositionInGrid, distType);
354     }
355
356     this->isChecked = true;
357   }
358
359  void CAxis::sendValue(const std::vector<int>& globalDim, int orderPositionInGrid,
360                        CServerDistributionDescription::ServerDistributionType distType)
361  {
362     if (n.getValue() == n_glo.getValue())
363     {
364       sendNonDistributedValue();
365     }
366     else
367     {
368       computeConnectedServer(globalDim, orderPositionInGrid, distType);
369       sendDistributedValue();
370     }
371  }
372
373  void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid,
374                                     CServerDistributionDescription::ServerDistributionType distType)
375  {
376    CContext* context = CContext::getCurrent();
377    CContextClient* client = context->client;
378    int nbServer = client->serverSize;
379    int range, clientSize = client->clientSize;
380
381    size_t ni = this->n.getValue();
382    size_t ibegin = this->begin.getValue();
383    size_t zoom_end = global_zoom_begin+global_zoom_size-1;
384    size_t nZoomCount = 0;
385    for (size_t idx = 0; idx < ni; ++idx)
386    {
387      size_t globalIndex = ibegin + idx;
388
389      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount;
390    }
391
392    CArray<size_t,1> globalIndexAxis(ni);
393    std::vector<size_t> globalAxisZoom(nZoomCount);
394    nZoomCount = 0;
395    for (size_t idx = 0; idx < ni; ++idx)
396    {
397      size_t globalIndex = ibegin + idx;
398      globalIndexAxis(idx) = globalIndex;
399      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
400      {
401        globalAxisZoom[nZoomCount] = globalIndex;
402        ++nZoomCount;
403      }
404    }
405
406    std::set<int> writtenInd;
407    if (isCompressible_)
408    {
409      for (int idx = 0; idx < data_index.numElements(); ++idx)
410      {
411        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
412
413        if (ind >= 0 && ind < ni && mask(ind))
414        {
415          ind += ibegin;
416          if (ind >= global_zoom_begin && ind <= zoom_end)
417            writtenInd.insert(ind);
418        }
419      }
420    }
421
422    CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer);
423    int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed();
424    std::map<int, std::vector<size_t> > globalIndexAxisOnServer;
425    if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side*
426    {
427      std::vector<int> nGlobAxis(1);
428      nGlobAxis[0] = n_glo.getValue();
429
430      size_t globalSizeIndex = 1, indexBegin, indexEnd;
431      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
432      indexBegin = 0;
433      for (int i = 0; i < clientSize; ++i)
434      {
435        range = globalSizeIndex / clientSize;
436        if (i < (globalSizeIndex%clientSize)) ++range;
437        if (i == client->clientRank) break;
438        indexBegin += range;
439      }
440      indexEnd = indexBegin + range - 1;
441
442      CServerDistributionDescription serverDescription(nGlobAxis, nbServer);
443      serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd));
444      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
445      clientServerMap->computeServerIndexMapping(globalIndexAxis);
446      globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();
447      delete clientServerMap;
448    }
449    else
450    {
451      std::vector<size_t> globalIndexServer(n_glo.getValue());
452      for (size_t idx = 0; idx < n_glo.getValue(); ++idx)
453      {
454        globalIndexServer[idx] = idx;
455      }
456
457      for (int idx = 0; idx < nbServer; ++idx)
458      {
459        globalIndexAxisOnServer[idx] = globalIndexServer;
460      }
461    }
462
463    std::map<int, std::vector<size_t> >::const_iterator it = globalIndexAxisOnServer.begin(),
464                                                       ite = globalIndexAxisOnServer.end();
465    std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(),
466                                        iteVec = (globalAxisZoom).end();
467    indSrv_.clear();
468    indWrittenSrv_.clear();
469    for (; it != ite; ++it)
470    {
471      int rank = it->first;
472      const std::vector<size_t>& globalIndexTmp = it->second;
473      int nb = globalIndexTmp.size();
474
475      for (int i = 0; i < nb; ++i)
476      {
477        if (std::binary_search(itbVec, iteVec, globalIndexTmp[i]))
478        {
479          indSrv_[rank].push_back(globalIndexTmp[i]);
480        }
481
482        if (writtenInd.count(globalIndexTmp[i]))
483        {
484          indWrittenSrv_[rank].push_back(globalIndexTmp[i]);
485        }
486      }
487    }
488
489    connectedServerRank_.clear();
490    for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {
491      connectedServerRank_.push_back(it->first);
492    }
493
494    if (!indSrv_.empty())
495    {
496      connectedServerRank_.clear();
497      for (it = indSrv_.begin(); it != indSrv_.end(); ++it)
498        connectedServerRank_.push_back(it->first);
499    }
500    nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_);
501  }
502
503  void CAxis::sendNonDistributedValue()
504  {
505    CContext* context = CContext::getCurrent();
506    CContextClient* client = context->client;
507    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);
508
509    int zoom_end = global_zoom_begin + global_zoom_size - 1;
510    int nb = 0;
511    for (size_t idx = 0; idx < n; ++idx)
512    {
513      size_t globalIndex = begin + idx;
514      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb;
515    }
516
517    int nbWritten = 0;
518    if (isCompressible_)
519    {
520      for (int idx = 0; idx < data_index.numElements(); ++idx)
521      {
522        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
523
524        if (ind >= 0 && ind < n && mask(ind))
525        {
526          ind += begin;
527          if (ind >= global_zoom_begin && ind <= zoom_end)
528            ++nbWritten;
529        }
530      }
531    }
532
533    CArray<double,1> val(nb);
534    nb = 0;
535    for (size_t idx = 0; idx < n; ++idx)
536    {
537      size_t globalIndex = begin + idx;
538      if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end)
539      {
540        val(nb) = value(idx);
541        ++nb;
542      }
543    }
544
545    CArray<int, 1> writtenInd(nbWritten);
546    nbWritten = 0;
547    if (isCompressible_)
548    {
549      for (int idx = 0; idx < data_index.numElements(); ++idx)
550      {
551        int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n);
552
553        if (ind >= 0 && ind < n && mask(ind))
554        {
555          ind += begin;
556          if (ind >= global_zoom_begin && ind <= zoom_end)
557          {
558            writtenInd(nbWritten) = ind;
559            ++nbWritten;
560          }
561        }
562      }
563    }
564
565    if (client->isServerLeader())
566    {
567      std::list<CMessage> msgs;
568
569      const std::list<int>& ranks = client->getRanksServerLeader();
570      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
571      {
572        msgs.push_back(CMessage());
573        CMessage& msg = msgs.back();
574        msg << this->getId();
575        msg << val;
576        if (isCompressible_)
577          msg << writtenInd;
578        event.push(*itRank, 1, msg);
579      }
580      client->sendEvent(event);
581    }
582    else client->sendEvent(event);
583  }
584
585  void CAxis::sendDistributedValue(void)
586  {
587    int ns, n, i, j, ind, nv, idx;
588    CContext* context = CContext::getCurrent();
589    CContextClient* client=context->client;
590
591    // send value for each connected server
592    CEventClient eventIndex(getType(), EVENT_ID_INDEX);
593    CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE);
594
595    list<CMessage> list_msgsIndex, list_msgsVal;
596    list<CArray<int,1> > list_indi;
597    list<CArray<int,1> > list_writtenInd;
598    list<CArray<double,1> > list_val;
599    list<CArray<double,2> > list_bounds;
600
601    std::map<int, std::vector<size_t> >::const_iterator it, iteMap;
602    iteMap = indSrv_.end();
603    for (int k = 0; k < connectedServerRank_.size(); ++k)
604    {
605      int nbData = 0;
606      int rank = connectedServerRank_[k];
607      it = indSrv_.find(rank);
608      if (iteMap != it)
609        nbData = it->second.size();
610
611      list_indi.push_back(CArray<int,1>(nbData));
612      list_val.push_back(CArray<double,1>(nbData));
613
614      if (hasBounds_)
615      {
616        list_bounds.push_back(CArray<double,2>(2,nbData));
617      }
618
619      CArray<int,1>& indi = list_indi.back();
620      CArray<double,1>& val = list_val.back();
621
622      for (n = 0; n < nbData; ++n)
623      {
624        idx = static_cast<int>(it->second[n]);
625        ind = idx - begin;
626
627        val(n) = value(ind);
628        indi(n) = idx;
629
630        if (hasBounds_)
631        {
632          CArray<double,2>& boundsVal = list_bounds.back();
633          boundsVal(0, n) = bounds(0,n);
634          boundsVal(1, n) = bounds(1,n);
635        }
636      }
637
638      list_msgsIndex.push_back(CMessage());
639      list_msgsIndex.back() << this->getId() << list_indi.back();
640
641      if (isCompressible_)
642      {
643        std::vector<int>& writtenIndSrc = indWrittenSrv_[rank];
644        list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size()));
645        CArray<int,1>& writtenInd = list_writtenInd.back();
646
647        for (n = 0; n < writtenInd.numElements(); ++n)
648          writtenInd(n) = writtenIndSrc[n];
649
650        list_msgsIndex.back() << writtenInd;
651      }
652
653      list_msgsVal.push_back(CMessage());
654      list_msgsVal.back() << this->getId() << list_val.back();
655
656      if (hasBounds_)
657      {
658        list_msgsVal.back() << list_bounds.back();
659      }
660
661      eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back());
662      eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back());
663    }
664
665    client->sendEvent(eventIndex);
666    client->sendEvent(eventVal);
667  }
668
669  void CAxis::recvIndex(CEventServer& event)
670  {
671    CAxis* axis;
672
673    list<CEventServer::SSubEvent>::iterator it;
674    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
675    {
676      CBufferIn* buffer = it->buffer;
677      string axisId;
678      *buffer >> axisId;
679      axis = get(axisId);
680      axis->recvIndex(it->rank, *buffer);
681    }
682
683    if (axis->isCompressible_)
684    {
685      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
686
687      CContextServer* server = CContext::getCurrent()->server;
688      axis->numberWrittenIndexes_ = axis->indexesToWrite.size();
689      MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
690      MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm);
691      axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_;
692    }
693  }
694
695  void CAxis::recvIndex(int rank, CBufferIn& buffer)
696  {
697    buffer >> indiSrv_[rank];
698
699    if (isCompressible_)
700    {
701      CArray<int, 1> writtenIndexes;
702      buffer >> writtenIndexes;
703      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
704      for (int i = 0; i < writtenIndexes.numElements(); ++i)
705        indexesToWrite.push_back(writtenIndexes(i));
706    }
707  }
708
709  void CAxis::recvDistributedValue(CEventServer& event)
710  {
711    list<CEventServer::SSubEvent>::iterator it;
712    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
713    {
714      CBufferIn* buffer = it->buffer;
715      string axisId;
716      *buffer >> axisId;
717      get(axisId)->recvDistributedValue(it->rank, *buffer);
718    }
719  }
720
721  void CAxis::recvDistributedValue(int rank, CBufferIn& buffer)
722  {
723    CArray<int,1> &indi = indiSrv_[rank];
724    CArray<double,1> val;
725    CArray<double,2> boundsVal;
726
727    buffer >> val;
728    if (hasBounds_) buffer >> boundsVal;
729
730    int i, j, ind_srv;
731    for (int ind = 0; ind < indi.numElements(); ++ind)
732    {
733      i = indi(ind);
734      ind_srv = i - zoom_begin_srv;
735      value_srv(ind_srv) = val(ind);
736      if (hasBounds_)
737      {
738        bound_srv(0,ind_srv) = boundsVal(0, ind);
739        bound_srv(1,ind_srv) = boundsVal(1, ind);
740      }
741    }
742  }
743
744   void CAxis::recvNonDistributedValue(CEventServer& event)
745  {
746    CAxis* axis;
747
748    list<CEventServer::SSubEvent>::iterator it;
749    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
750    {
751      CBufferIn* buffer = it->buffer;
752      string axisId;
753      *buffer >> axisId;
754      axis = get(axisId);
755      axis->recvNonDistributedValue(it->rank, *buffer);
756    }
757
758    if (axis->isCompressible_)
759    {
760      std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end());
761
762      axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size();
763      axis->offsetWrittenIndexes_ = 0;
764    }
765  }
766
767  void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer)
768  {
769    CArray<double,1> val;
770    buffer >> val;
771
772    for (int ind = 0; ind < val.numElements(); ++ind)
773    {
774      value_srv(ind) = val(ind);
775      if (hasBounds_)
776      {
777        bound_srv(0,ind) = bounds(0,ind);
778        bound_srv(1,ind) = bounds(1,ind);
779      }
780    }
781
782    if (isCompressible_)
783    {
784      CArray<int, 1> writtenIndexes;
785      buffer >> writtenIndexes;
786      indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements());
787      for (int i = 0; i < writtenIndexes.numElements(); ++i)
788        indexesToWrite.push_back(writtenIndexes(i));
789    }
790  }
791
792  void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,
793                                 CServerDistributionDescription::ServerDistributionType distType)
794  {
795    CContext* context = CContext::getCurrent();
796    CContextClient* client = context->client;
797    int nbServer = client->serverSize;
798
799    CServerDistributionDescription serverDescription(globalDim, nbServer);
800    serverDescription.computeServerDistribution();
801
802    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
803    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
804
805    CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT);
806    if (client->isServerLeader())
807    {
808      std::list<CMessage> msgs;
809
810      const std::list<int>& ranks = client->getRanksServerLeader();
811      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
812      {
813        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
814        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
815        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
816        const int end   = begin + ni - 1;
817
818        msgs.push_back(CMessage());
819        CMessage& msg = msgs.back();
820        msg << this->getId();
821        msg << ni << begin << end;
822        msg << global_zoom_begin << global_zoom_size;
823        msg << isCompressible_;
824
825        event.push(*itRank,1,msg);
826      }
827      client->sendEvent(event);
828    }
829    else client->sendEvent(event);
830  }
831
832  void CAxis::recvServerAttribut(CEventServer& event)
833  {
834    CBufferIn* buffer = event.subEvents.begin()->buffer;
835    string axisId;
836    *buffer >> axisId;
837    get(axisId)->recvServerAttribut(*buffer);
838  }
839
840  void CAxis::recvServerAttribut(CBufferIn& buffer)
841  {
842    int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_size_tmp;
843
844    buffer >> ni_srv >> begin_srv >> end_srv;
845    buffer >> global_zoom_begin_tmp >> global_zoom_size_tmp;
846    buffer >> isCompressible_;
847    global_zoom_begin = global_zoom_begin_tmp;
848    global_zoom_size  = global_zoom_size_tmp;
849    int global_zoom_end = global_zoom_begin + global_zoom_size - 1;
850
851    zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ;
852    zoom_end_srv   = global_zoom_end < end_srv ? global_zoom_end : end_srv ;
853    zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
854
855    if (zoom_size_srv<=0)
856    {
857      zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0;
858    }
859
860    if (n_glo == n)
861    {
862      zoom_begin_srv = global_zoom_begin;
863      zoom_end_srv   = global_zoom_end; //zoom_end;
864      zoom_size_srv  = zoom_end_srv - zoom_begin_srv + 1;
865    }
866    value_srv.resize(zoom_size_srv);
867    bound_srv.resize(2,zoom_size_srv);
868  }
869
870  bool CAxis::hasTransformation()
871  {
872    return (!transformationMap_.empty());
873  }
874
875  void CAxis::setTransformations(const TransMapTypes& axisTrans)
876  {
877    transformationMap_ = axisTrans;
878  }
879
880  CAxis::TransMapTypes CAxis::getAllTransformations(void)
881  {
882    return transformationMap_;
883  }
884
885  /*!
886    Check the validity of all transformations applied on axis
887  This functions is called AFTER all inherited attributes are solved
888  */
889  void CAxis::checkTransformations()
890  {
891    TransMapTypes::const_iterator itb = transformationMap_.begin(), it,
892                                  ite = transformationMap_.end();
893    for (it = itb; it != ite; ++it)
894    {
895      (it->second)->checkValid(this);
896    }
897  }
898
899  /*!
900   * Go through the hierarchy to find the axis from which the transformations must be inherited
901   */
902  void CAxis::solveInheritanceTransformation()
903  {
904    if (hasTransformation() || !hasDirectAxisReference())
905      return;
906
907    CAxis* axis = this;
908    std::vector<CAxis*> refAxis;
909    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
910    {
911      refAxis.push_back(axis);
912      axis = axis->getDirectAxisReference();
913    }
914
915    if (axis->hasTransformation())
916      for (size_t i = 0; i < refAxis.size(); ++i)
917        refAxis[i]->setTransformations(axis->getAllTransformations());
918  }
919
920  void CAxis::parse(xml::CXMLNode & node)
921  {
922    SuperClass::parse(node);
923
924    if (node.goToChildElement())
925    {
926      StdString inverseAxisDefRoot("inverse_axis_definition");
927      StdString inverse("inverse_axis");
928      StdString zoomAxisDefRoot("zoom_axis_definition");
929      StdString zoom("zoom_axis");
930      StdString interpAxisDefRoot("interpolate_axis_definition");
931      StdString interp("interpolate_axis");
932      do
933      {
934        StdString nodeId("");
935        if (node.getAttributes().end() != node.getAttributes().find("id"))
936        { nodeId = node.getAttributes()["id"]; }
937
938        if (node.getElementName() == inverse) {
939          CInverseAxis* tmp = (CInverseAxisGroup::get(inverseAxisDefRoot))->createChild(nodeId);
940          tmp->parse(node);
941          transformationMap_.push_back(std::make_pair(TRANS_INVERSE_AXIS,tmp));
942        }
943        else if (node.getElementName() == zoom) {
944          CZoomAxis* tmp = (CZoomAxisGroup::get(zoomAxisDefRoot))->createChild(nodeId);
945          tmp->parse(node);
946          transformationMap_.push_back(std::make_pair(TRANS_ZOOM_AXIS,tmp));
947        }
948        else if (node.getElementName() == interp) {
949          CInterpolateAxis* tmp = (CInterpolateAxisGroup::get(interpAxisDefRoot))->createChild(nodeId);
950          tmp->parse(node);
951          transformationMap_.push_back(std::make_pair(TRANS_INTERPOLATE_AXIS,tmp));
952        }
953      } while (node.goToNextElement()) ;
954      node.goToParentElement();
955    }
956  }
957
958  DEFINE_REF_FUNC(Axis,axis)
959
960   ///---------------------------------------------------------------
961
962} // namespace xios
Note: See TracBrowser for help on using the repository browser.