source: XIOS/dev/dev_ym/XIOS_COUPLING/src/node/axis.cpp @ 1943

Last change on this file since 1943 was 1943, checked in by ymipsl, 4 years ago

Solve issues for grid mask on server side.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:executable set to *
File size: 62.2 KB
Line 
1#include "axis.hpp"
2
3#include "attribute_template.hpp"
4#include "object_template.hpp"
5#include "group_template.hpp"
6#include "message.hpp"
7#include "type.hpp"
8#include "context.hpp"
9#include "context_client.hpp"
10#include "context_server.hpp"
11#include "xios_spl.hpp"
12#include "server_distribution_description.hpp"
13#include "client_server_mapping_distributed.hpp"
14#include "distribution_client.hpp"
15
16namespace xios {
17
18   /// ////////////////////// Definitions ////////////////////// ///
19
20   CAxis::CAxis(void)
21      : CObjectTemplate<CAxis>()
22      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
23      , isClientAfterTransformationChecked(false)
24      , hasBounds(false), isCompressible_(false)
25      , numberWrittenIndexes_(), totalNumberWrittenIndexes_(), offsetWrittenIndexes_()
26      , transformationMap_(), hasValue(false), hasLabel(false)
27      , computedWrittenIndex_(false)
28          , clients()
29   {
30   }
31
32   CAxis::CAxis(const StdString & id)
33      : CObjectTemplate<CAxis>(id)
34      , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false)
35      , isClientAfterTransformationChecked(false)
36      , hasBounds(false), isCompressible_(false)
37      , numberWrittenIndexes_(), totalNumberWrittenIndexes_(), offsetWrittenIndexes_()
38      , transformationMap_(), hasValue(false), hasLabel(false)
39      , computedWrittenIndex_(false)
40          , clients()
41   {
42   }
43
44   CAxis::~CAxis(void)
45   { /* Ne rien faire de plus */ }
46
47   std::map<StdString, ETranformationType> CAxis::transformationMapList_ = std::map<StdString, ETranformationType>();
48   bool CAxis::dummyTransformationMapList_ = CAxis::initializeTransformationMap(CAxis::transformationMapList_);
49   bool CAxis::initializeTransformationMap(std::map<StdString, ETranformationType>& m)
50   TRY
51   {
52     m["zoom_axis"] = TRANS_ZOOM_AXIS;
53     m["interpolate_axis"] = TRANS_INTERPOLATE_AXIS;
54     m["extract_axis"] = TRANS_EXTRACT_AXIS;
55     m["inverse_axis"] = TRANS_INVERSE_AXIS;
56     m["reduce_domain"] = TRANS_REDUCE_DOMAIN_TO_AXIS;
57     m["reduce_axis"] = TRANS_REDUCE_AXIS_TO_AXIS;
58     m["extract_domain"] = TRANS_EXTRACT_DOMAIN_TO_AXIS;
59     m["temporal_splitting"] = TRANS_TEMPORAL_SPLITTING;
60     m["duplicate_scalar"] = TRANS_DUPLICATE_SCALAR_TO_AXIS;
61
62   }
63   CATCH
64
65   ///---------------------------------------------------------------
66
67   const std::set<StdString> & CAxis::getRelFiles(void) const
68   TRY
69   {
70      return (this->relFiles);
71   }
72   CATCH
73
74   bool CAxis::IsWritten(const StdString & filename) const
75   TRY
76   {
77      return (this->relFiles.find(filename) != this->relFiles.end());
78   }
79   CATCH
80
81   bool CAxis::isWrittenCompressed(const StdString& filename) const
82   TRY
83   {
84      return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end());
85   }
86   CATCH
87
88   bool CAxis::isDistributed(void) const
89   TRY
90   {
91      bool distributed = (!this->begin.isEmpty() && !this->n.isEmpty() && (this->begin + this->n < this->n_glo)) ||
92             (!this->n.isEmpty() && (this->n != this->n_glo));
93      // A condition to make sure that if there is only one client, axis
94      // should be considered to be distributed. This should be a temporary solution     
95      distributed |= (1 == CContext::getCurrent()->intraCommSize_);
96      return distributed;
97   }
98   CATCH
99
100   /*!
101    * Test whether the data defined on the axis can be outputted in a compressed way.
102    *
103    * \return true if and only if a mask was defined for this axis
104    */
105   bool CAxis::isCompressible(void) const
106   TRY
107   {
108      return isCompressible_;
109   }
110   CATCH
111
112   void CAxis::addRelFile(const StdString & filename)
113   TRY
114   {
115      this->relFiles.insert(filename);
116   }
117   CATCH_DUMP_ATTR
118
119   void CAxis::addRelFileCompressed(const StdString& filename)
120   TRY
121   {
122      this->relFilesCompressed.insert(filename);
123   }
124   CATCH_DUMP_ATTR
125
126   //----------------------------------------------------------------
127
128   /*!
129     Returns the number of indexes written by each server.
130     \return the number of indexes written by each server
131   */
132   int CAxis::getNumberWrittenIndexes(MPI_Comm writtenCom)
133   TRY
134   {
135     int writtenSize;
136     MPI_Comm_size(writtenCom, &writtenSize);
137     return numberWrittenIndexes_[writtenSize];
138   }
139   CATCH_DUMP_ATTR
140
141   /*!
142     Returns the total number of indexes written by the servers.
143     \return the total number of indexes written by the servers
144   */
145   int CAxis::getTotalNumberWrittenIndexes(MPI_Comm writtenCom)
146   TRY
147   {
148     int writtenSize;
149     MPI_Comm_size(writtenCom, &writtenSize);
150     return totalNumberWrittenIndexes_[writtenSize];
151   }
152   CATCH_DUMP_ATTR
153
154   /*!
155     Returns the offset of indexes written by each server.
156     \return the offset of indexes written by each server
157   */
158   int CAxis::getOffsetWrittenIndexes(MPI_Comm writtenCom)
159   TRY
160   {
161     int writtenSize;
162     MPI_Comm_size(writtenCom, &writtenSize);
163     return offsetWrittenIndexes_[writtenSize];
164   }
165   CATCH_DUMP_ATTR
166
167   CArray<int, 1>& CAxis::getCompressedIndexToWriteOnServer(MPI_Comm writtenCom)
168   TRY
169   {
170     int writtenSize;
171     MPI_Comm_size(writtenCom, &writtenSize);
172     return compressedIndexToWriteOnServer[writtenSize];
173   }
174   CATCH_DUMP_ATTR
175
176   //----------------------------------------------------------------
177
178   /*!
179    * Compute the minimum buffer size required to send the attributes to the server(s).
180    *
181    * \return A map associating the server rank with its minimum buffer size.
182    */
183   std::map<int, StdSize> CAxis::getAttributesBufferSize(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid,
184                                                         CServerDistributionDescription::ServerDistributionType distType)
185   TRY
186   {
187
188     std::map<int, StdSize> attributesSizes = getMinimumBufferSizeForAttributes(client);
189
190//     bool isNonDistributed = (n_glo == n);
191     bool isDistributed = (orderPositionInGrid == CServerDistributionDescription::defaultDistributedDimension(globalDim.size(), distType))
192                                 || (index.numElements() != n_glo);
193
194     if (client->isServerLeader())
195     {
196       // size estimation for sendServerAttribut
197       size_t size = 6 * sizeof(size_t);
198       // size estimation for sendNonDistributedValue
199       if (!isDistributed)
200       {
201//         size = std::max(size, CArray<double,1>::size(n_glo) + (isCompressible_ ? CArray<int,1>::size(n_glo) : 0));
202         size += CArray<int,1>::size(n_glo);
203         size += CArray<int,1>::size(n_glo);
204         size += CArray<bool,1>::size(n_glo);
205         size += CArray<double,1>::size(n_glo);
206         if (hasBounds)
207           size += CArray<double,2>::size(2*n_glo);
208         if (hasLabel)
209          size += CArray<StdString,1>::size(n_glo);
210       }
211       size += CEventClient::headerSize + getId().size() + sizeof(size_t);
212
213       const std::list<int>& ranks = client->getRanksServerLeader();
214       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
215       {
216         if (size > attributesSizes[*itRank])
217           attributesSizes[*itRank] = size;
218       }
219       const std::list<int>& ranksNonLeaders = client->getRanksServerNotLeader();
220       for (std::list<int>::const_iterator itRank = ranksNonLeaders.begin(), itRankEnd = ranksNonLeaders.end(); itRank != itRankEnd; ++itRank)
221       {
222         if (size > attributesSizes[*itRank])
223           attributesSizes[*itRank] = size;
224       }
225
226     }
227
228     if (isDistributed)
229     {
230       // size estimation for sendDistributedValue
231       std::unordered_map<int, vector<size_t> >::const_iterator it, ite = indSrv_[client->serverSize].end();
232       for (it = indSrv_[client->serverSize].begin(); it != ite; ++it)
233       {
234         size_t size = 6 * sizeof(size_t);
235         size += CArray<int,1>::size(it->second.size());
236         size += CArray<int,1>::size(it->second.size());
237         size += CArray<bool,1>::size(it->second.size());
238         size += CArray<double,1>::size(it->second.size());
239         if (hasBounds)
240           size += CArray<double,2>::size(2 * it->second.size());
241         if (hasLabel)
242           size += CArray<StdString,1>::size(it->second.size());
243
244         size += CEventClient::headerSize + getId().size() + sizeof(size_t);
245         if (size > attributesSizes[it->first])
246           attributesSizes[it->first] = size;
247       }
248     }
249     return attributesSizes;
250   }
251   CATCH_DUMP_ATTR
252
253   //----------------------------------------------------------------
254
255   StdString CAxis::GetName(void)   { return (StdString("axis")); }
256   StdString CAxis::GetDefName(void){ return (CAxis::GetName()); }
257   ENodeType CAxis::GetType(void)   { return (eAxis); }
258
259   //----------------------------------------------------------------
260
261   CAxis* CAxis::createAxis()
262   TRY
263   {
264     CAxis* axis = CAxisGroup::get("axis_definition")->createChild();
265     return axis;
266   }
267   CATCH
268
269   /*!
270     Check common attributes of an axis.
271     This check should be done in the very beginning of work flow
272   */
273   void CAxis::checkAttributes(void)
274   TRY
275   {
276     if (checkAttributes_done_) return ;
277
278     CContext* context=CContext::getCurrent();
279
280     if (this->n_glo.isEmpty())
281        ERROR("CAxis::checkAttributes(void)",
282              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
283              << "The axis is wrongly defined, attribute 'n_glo' must be specified");
284      StdSize size = this->n_glo.getValue();
285
286      if (!this->index.isEmpty())
287      {
288        if (n.isEmpty()) n = index.numElements();
289
290        // It's not so correct but if begin is not the first value of index
291        // then data on the local axis has user-defined distribution. In this case, begin has no meaning.
292        if (begin.isEmpty()) begin = index(0);         
293      }
294      else 
295      {
296        if (!this->begin.isEmpty())
297        {
298          if (begin < 0 || begin > size - 1)
299            ERROR("CAxis::checkAttributes(void)",
300                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
301                  << "The axis is wrongly defined, attribute 'begin' (" << begin.getValue() << ") must be non-negative and smaller than size-1 (" << size - 1 << ").");
302        }
303        else this->begin.setValue(0);
304
305        if (!this->n.isEmpty())
306        {
307          if (n < 0 || n > size)
308            ERROR("CAxis::checkAttributes(void)",
309                  << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
310                  << "The axis is wrongly defined, attribute 'n' (" << n.getValue() << ") must be non-negative and smaller than size (" << size << ").");
311        }
312        else this->n.setValue(size);
313
314        {
315          index.resize(n);
316          for (int i = 0; i < n; ++i) index(i) = i+begin;
317        }
318      }
319
320      if (!this->value.isEmpty())
321      {
322        StdSize true_size = value.numElements();
323        if (this->n.getValue() != true_size)
324          ERROR("CAxis::checkAttributes(void)",
325              << "[ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] "
326              << "The axis is wrongly defined, attribute 'value' has a different size (" << true_size
327              << ") than the one defined by the \'size\' attribute (" << n.getValue() << ").");
328        this->hasValue = true;
329      }
330
331      this->checkBounds();
332      this->checkMask();
333      this->checkData();
334      this->checkLabel();
335      initializeLocalElement() ;
336      addFullView() ;
337      addWorkflowView() ;
338      addModelView() ;
339
340      checkAttributes_done_ = true ;
341   }
342   CATCH_DUMP_ATTR
343
344
345
346   /*!
347      Check the validity of data, fill in values if any, and apply mask.
348   */
349   void CAxis::checkData()
350   TRY
351   {
352      if (data_begin.isEmpty()) data_begin.setValue(0);
353
354      if (data_n.isEmpty())
355      {
356        data_n.setValue(n);
357      }
358      else if (data_n.getValue() < 0)
359      {
360        ERROR("CAxis::checkData(void)",
361              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
362              << "The data size should be strictly positive ('data_n' = " << data_n.getValue() << ").");
363      }
364
365      if (data_index.isEmpty())
366      {
367        data_index.resize(data_n);
368        for (int i = 0; i < data_n; ++i)
369        {
370          if ((i+data_begin) >= 0 && (i+data_begin<n))
371          {
372            if (mask(i+data_begin))
373              data_index(i) = i+data_begin;
374            else
375              data_index(i) = -1;
376          }
377          else
378            data_index(i) = -1;
379        }
380      }
381      else
382      {
383        if (data_index.numElements() != data_n)
384        {
385          ERROR("CAxis::checkData(void)",
386                << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
387                << "The size of data_index = "<< data_index.numElements() << "is not equal to the data size data_n = " << data_n.getValue() << ").");
388        }
389        for (int i = 0; i < data_n; ++i)
390        {
391           if (data_index(i) >= 0 && data_index(i)<n)
392             if (!mask(data_index(i))) data_index(i) = -1;
393        }
394      }
395
396   }
397   CATCH_DUMP_ATTR
398
399    size_t CAxis::getGlobalWrittenSize(void)
400    {
401      return n_glo ;
402    }
403
404   /*!
405     Check validity of mask info and fill in values if any.
406   */
407   void CAxis::checkMask()
408   TRY
409   {
410      if (!mask.isEmpty())
411      {
412        if (mask.extent(0) != n)
413        {
414          ERROR("CAxis::checkMask(void)",
415              << "[ id = " << this->getId() << " , context = '" << CObjectFactory::GetCurrentContextId() << " ] "
416              << "The mask does not have the same size as the local domain." << std::endl
417              << "Local size is " << n.getValue() << "." << std::endl
418              << "Mask size is " << mask.extent(0) << ".");
419        }
420      }
421      else
422      {
423        mask.resize(n);
424        mask = true;
425      }
426   }
427   CATCH_DUMP_ATTR
428
429   /*!
430     Check validity of bounds info and fill in values if any.
431   */
432   void CAxis::checkBounds()
433   TRY
434   {
435     if (!bounds.isEmpty())
436     {
437       if (bounds.extent(0) != 2 || bounds.extent(1) != n)
438         ERROR("CAxis::checkAttributes(void)",
439               << "The bounds array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension 2 x axis size." << std::endl
440               << "Axis size is " << n.getValue() << "." << std::endl
441               << "Bounds size is "<< bounds.extent(0) << " x " << bounds.extent(1) << ".");
442       hasBounds = true;
443     }
444     else hasBounds = false;
445   }
446   CATCH_DUMP_ATTR
447
448  void CAxis::checkLabel()
449  TRY
450  {
451    if (!label.isEmpty())
452    {
453      if (label.extent(0) != n)
454        ERROR("CAxis::checkLabel(void)",
455              << "The label array of the axis [ id = '" << getId() << "' , context = '" << CObjectFactory::GetCurrentContextId() << "' ] must be of dimension of axis size." << std::endl
456              << "Axis size is " << n.getValue() << "." << std::endl
457              << "label size is "<< label.extent(0)<<  " .");
458      hasLabel = true;
459    }
460    else hasLabel = false;
461  }
462  CATCH_DUMP_ATTR
463
464  /*!
465    Check whether we can do compressed output
466  */
467  void CAxis::checkEligibilityForCompressedOutput()
468  TRY
469  {
470    // We don't check if the mask is valid here, just if a mask has been defined at this point.
471    isCompressible_ = !mask.isEmpty();
472  }
473  CATCH_DUMP_ATTR
474
475  /*!
476    Dispatch event from the lower communication layer then process event according to its type
477  */
478  bool CAxis::dispatchEvent(CEventServer& event)
479  TRY
480  {
481     if (SuperClass::dispatchEvent(event)) return true;
482     else
483     {
484       switch(event.type)
485       {
486          case EVENT_ID_DISTRIBUTION_ATTRIBUTE :
487            recvDistributionAttribute(event);
488            return true;
489            break;
490         case EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES:
491           recvNonDistributedAttributes(event);
492           return true;
493           break;
494         case EVENT_ID_DISTRIBUTED_ATTRIBUTES:
495           recvDistributedAttributes_old(event);
496           return true;
497           break;
498         case EVENT_ID_AXIS_DISTRIBUTION:
499           recvAxisDistribution(event);
500           return true;
501           break;
502         case EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE:
503           recvDistributedAttributes(event);
504           return true;
505           break;
506          default :
507            ERROR("bool CAxis::dispatchEvent(CEventServer& event)",
508                   << "Unknown Event");
509          return false;
510        }
511     }
512  }
513  CATCH
514
515   /*!
516     Check attributes on client side (This name is still adequate???)
517   */
518   void CAxis::checkAttributesOnClient()
519   TRY
520   {
521     if (this->areClientAttributesChecked_) return;
522
523     CContext* context=CContext::getCurrent();
524     if (context->getServiceType()==CServicesManager::CLIENT) this->checkAttributes();
525
526     this->areClientAttributesChecked_ = true;
527   }
528   CATCH_DUMP_ATTR
529
530   /*
531     The (spatial) transformation sometimes can change attributes of an axis (e.g zoom can change mask or generate can change whole attributes)
532     Therefore, we should recheck them.
533   */
534   // ym : obsolete to be removed
535   void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,
536                                                          CServerDistributionDescription::ServerDistributionType distType)
537   TRY
538   {
539     CContext* context=CContext::getCurrent() ;
540
541     if (this->isClientAfterTransformationChecked) return;
542     if (context->getServiceType()==CServicesManager::CLIENT || context->getServiceType()==CServicesManager::GATHERER)
543     {       
544       /* suppressed because of interface changed
545       if (orderPositionInGrid == CServerDistributionDescription::defaultDistributedDimension(globalDim.size(), distType))
546         computeConnectedClients(globalDim, orderPositionInGrid, distType);
547       else if (index.numElements() != n_glo) computeConnectedClients(globalDim, orderPositionInGrid,  CServerDistributionDescription::ROOT_DISTRIBUTION);
548       */
549     }
550
551     this->isClientAfterTransformationChecked = true;
552   }
553   CATCH_DUMP_ATTR
554
555   /*
556     Send all checked attributes to server? (We dont have notion of server any more so client==server)
557     \param [in] globalDim global dimension of grid containing this axis
558     \param [in] orderPositionInGrid the relative order of this axis in the grid (e.g grid composed of domain+axis -> orderPositionInGrid is 2)
559     \param [in] distType distribution type of the server. For now, we only have band distribution.
560
561   */
562   //ym obsolete : to be removed
563   void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,
564                                     CServerDistributionDescription::ServerDistributionType distType)
565   TRY
566   {
567     if (!this->areClientAttributesChecked_) checkAttributesOnClient();
568     if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType);
569     CContext* context = CContext::getCurrent();
570
571     if (this->isChecked) return;
572     if (context->getServiceType()==CServicesManager::CLIENT || context->getServiceType()==CServicesManager::GATHERER) /*sendAttributes(globalDim, orderPositionInGrid, distType)*/;   
573
574     this->isChecked = true;
575   }
576   CATCH_DUMP_ATTR
577
578 
579   void CAxis::sendAxisToFileServer(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid)
580   {
581     if (sendAxisToFileServer_done_.count(client)!=0) return ;
582     else sendAxisToFileServer_done_.insert(client) ;
583     
584     StdString axisDefRoot("axis_definition");
585     CAxisGroup* axisPtr = CAxisGroup::get(axisDefRoot);
586     axisPtr->sendCreateChild(this->getId(),client);
587     this->sendAllAttributesToServer(client)  ; 
588     this->sendAttributes(client, globalDim, orderPositionInGrid, CServerDistributionDescription::BAND_DISTRIBUTION) ;
589   }
590
591   void CAxis::sendAxisToCouplerOut(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid, const string& fieldId, int posInGrid)
592   {
593     if (sendAxisToFileServer_done_.count(client)!=0) return ;
594     else sendAxisToFileServer_done_.insert(client) ;
595     
596     string axisId="_axis["+std::to_string(posInGrid)+"]_of_"+fieldId ;
597
598     if (!axis_ref.isEmpty())
599    {
600      auto axis_ref_tmp=axis_ref.getValue() ;
601      axis_ref.reset() ; // remove the reference, find an other way to do that more cleanly
602      this->sendAllAttributesToServer(client, axisId)  ; 
603      axis_ref = axis_ref_tmp ;
604    }
605    else this->sendAllAttributesToServer(client, axisId)  ; 
606 
607    this->sendAttributes(client, globalDim, orderPositionInGrid, CServerDistributionDescription::BAND_DISTRIBUTION, axisId) ;
608   }
609
610  void CAxis::makeAliasForCoupling(const string& fieldId, int posInGrid)
611  {
612    const string axisId = "_axis["+std::to_string(posInGrid)+"]_of_"+fieldId ;
613    this->createAlias(axisId) ;
614  }
615
616  /*!
617    Send attributes from one client to other clients
618    \param[in] globalDim global dimension of grid which contains this axis
619    \param[in] order
620  */
621  void CAxis::sendAttributes(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid,
622                             CServerDistributionDescription::ServerDistributionType distType, const string& axisId)
623  TRY
624  {
625     sendDistributionAttribute(client, globalDim, orderPositionInGrid, distType, axisId);
626
627     // if (index.numElements() == n_glo.getValue())
628     if ((orderPositionInGrid == CServerDistributionDescription::defaultDistributedDimension(globalDim.size(), distType))
629         || (index.numElements() != n_glo))
630     {
631       sendDistributedAttributes_old(client, axisId);       
632     }
633     else
634     {
635       sendNonDistributedAttributes(client, axisId);   
636     }     
637  }
638  CATCH_DUMP_ATTR
639
640  /*
641    Compute the connection between group of clients (or clients/servers).
642    (E.g: Suppose we have 2 group of clients in two model: A (client role) connect to B (server role),
643    this function calculate number of clients B connect to one client of A)
644     \param [in] globalDim global dimension of grid containing this axis
645     \param [in] orderPositionInGrid the relative order of this axis in the grid (e.g grid composed of domain+axis -> orderPositionInGrid is 2)
646     \param [in] distType distribution type of the server. For now, we only have band distribution.
647  */
648  void CAxis::computeConnectedClients(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid)
649  TRY
650  {
651    if (computeConnectedClients_done_.count(client)!=0) return ;
652    else computeConnectedClients_done_.insert(client) ;
653
654    CContext* context = CContext::getCurrent();
655    CServerDistributionDescription::ServerDistributionType distType ;
656    int defaultDistributedPos = CServerDistributionDescription::defaultDistributedDimension(globalDim.size(), CServerDistributionDescription::BAND_DISTRIBUTION) ;
657   
658    if (orderPositionInGrid == defaultDistributedPos) distType =  CServerDistributionDescription::BAND_DISTRIBUTION ;
659    else if (index.numElements() != n_glo) distType =  CServerDistributionDescription::ROOT_DISTRIBUTION ;
660    else return ;
661
662    int nbServer = client->serverSize;
663    int range, clientSize = client->clientSize;
664    int rank = client->clientRank;
665
666    if (listNbServer_.count(nbServer) == 0)
667    {
668      listNbServer_.insert(nbServer) ;
669
670      if (connectedServerRank_.find(nbServer) != connectedServerRank_.end())
671      {
672        nbSenders.erase(nbServer);
673        connectedServerRank_.erase(nbServer);
674      }
675
676      size_t ni = this->n.getValue();
677      size_t ibegin = this->begin.getValue();
678      size_t nbIndex = index.numElements();
679
680      // First of all, we should compute the mapping of the global index and local index of the current client
681      if (globalLocalIndexMap_.empty())
682      {
683        for (size_t idx = 0; idx < nbIndex; ++idx)
684        {
685          globalLocalIndexMap_[index(idx)] = idx;
686        }
687      }
688
689      // Calculate the compressed index if any
690      //        std::set<int> writtenInd;
691      //        if (isCompressible_)
692      //        {
693      //          for (int idx = 0; idx < data_index.numElements(); ++idx)
694      //          {
695      //            int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni);
696      //
697      //            if (ind >= 0 && ind < ni && mask(ind))
698      //            {
699      //              ind += ibegin;
700      //              writtenInd.insert(ind);
701      //            }
702      //          }
703      //        }
704
705      // Compute the global index of the current client (process) hold
706      std::vector<int> nGlobAxis(1);
707      nGlobAxis[0] = n_glo.getValue();
708
709      size_t globalSizeIndex = 1, indexBegin, indexEnd;
710      for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i];
711      indexBegin = 0;
712      if (globalSizeIndex <= clientSize)
713      {
714        indexBegin = rank%globalSizeIndex;
715        indexEnd = indexBegin;
716      }
717      else
718      {
719        for (int i = 0; i < clientSize; ++i)
720        {
721          range = globalSizeIndex / clientSize;
722          if (i < (globalSizeIndex%clientSize)) ++range;
723          if (i == client->clientRank) break;
724          indexBegin += range;
725        }
726        indexEnd = indexBegin + range - 1;
727      }
728
729      CArray<size_t,1> globalIndex(index.numElements());
730      for (size_t idx = 0; idx < globalIndex.numElements(); ++idx)
731        globalIndex(idx) = index(idx);
732
733      // Describe the distribution of server side
734
735      CServerDistributionDescription serverDescription(nGlobAxis, nbServer, distType);
736   
737      std::vector<int> serverZeroIndex;
738      serverZeroIndex = serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t&,size_t&>(indexBegin, indexEnd), 0);
739
740      std::list<int> serverZeroIndexLeader;
741      std::list<int> serverZeroIndexNotLeader; 
742      CContextClient::computeLeader(client->clientRank, client->clientSize, serverZeroIndex.size(), serverZeroIndexLeader, serverZeroIndexNotLeader);
743      for (std::list<int>::iterator it = serverZeroIndexLeader.begin(); it != serverZeroIndexLeader.end(); ++it)
744        *it = serverZeroIndex[*it];
745
746      // Find out the connection between client and server side
747      CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm);
748      clientServerMap->computeServerIndexMapping(globalIndex, nbServer);
749      CClientServerMapping::GlobalIndexMap& globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer();     
750
751      indSrv_[nbServer].swap(globalIndexAxisOnServer);
752
753      if (distType==CServerDistributionDescription::ROOT_DISTRIBUTION)
754      {
755        for(int i=1; i<nbServer; ++i) indSrv_[nbServer].insert(pair<int, vector<size_t> >(i,indSrv_[nbServer][0]) ) ;
756        serverZeroIndexLeader.clear() ;
757      }
758       
759      CClientServerMapping::GlobalIndexMap::const_iterator it  = indSrv_[nbServer].begin(),
760                                                           ite = indSrv_[nbServer].end();
761
762      for (it = indSrv_[nbServer].begin(); it != ite; ++it) connectedServerRank_[nbServer].push_back(it->first);
763
764      for (std::list<int>::const_iterator it = serverZeroIndexLeader.begin(); it != serverZeroIndexLeader.end(); ++it)
765        connectedServerRank_[nbServer].push_back(*it);
766
767       // Even if a client has no index, it must connect to at least one server and
768       // send an "empty" data to this server
769       if (connectedServerRank_[nbServer].empty())
770        connectedServerRank_[nbServer].push_back(client->clientRank % client->serverSize);
771
772      nbSenders[nbServer] = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_[nbServer]);
773
774      delete clientServerMap;
775    }
776  }
777  CATCH_DUMP_ATTR
778
779  /*
780    Compute the index of data to write into file
781    (Different from the previous version, this version of XIOS allows data be written into file (classical role),
782    or transfered to another clients)
783  */
784  void CAxis::computeWrittenIndex()
785  TRY
786  { 
787    if (computedWrittenIndex_) return;
788    computedWrittenIndex_ = true;
789
790    CContext* context=CContext::getCurrent();     
791
792    // We describe the distribution of client (server) on which data are written
793    std::vector<int> nBegin(1), nSize(1), nBeginGlobal(1), nGlob(1);
794    nBegin[0]       = begin;
795    nSize[0]        = n;
796    nBeginGlobal[0] = 0; 
797    nGlob[0]        = n_glo;
798    CDistributionServer srvDist(context->intraCommSize_, nBegin, nSize, nBeginGlobal, nGlob); 
799    const CArray<size_t,1>& writtenGlobalIndex  = srvDist.getGlobalIndex();
800
801    // Because all written data are local on a client,
802    // we need to compute the local index on the server from its corresponding global index
803    size_t nbWritten = 0, indGlo;     
804    std::unordered_map<size_t,size_t>::const_iterator itb = globalLocalIndexMap_.begin(),
805                                                        ite = globalLocalIndexMap_.end(), it;         
806    CArray<size_t,1>::const_iterator itSrvb = writtenGlobalIndex.begin(),
807                                     itSrve = writtenGlobalIndex.end(), itSrv; 
808
809    localIndexToWriteOnServer.resize(writtenGlobalIndex.numElements());
810    nbWritten = 0;
811    for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
812    {
813      indGlo = *itSrv;
814      if (ite != globalLocalIndexMap_.find(indGlo))
815      {
816        localIndexToWriteOnServer(nbWritten) = globalLocalIndexMap_[indGlo];
817      }
818      else
819      {
820        localIndexToWriteOnServer(nbWritten) = -1;
821      }
822      ++nbWritten;
823    }
824
825  }
826  CATCH_DUMP_ATTR
827
828  void CAxis::computeWrittenCompressedIndex(MPI_Comm writtenComm)
829  TRY
830  {
831    int writtenCommSize;
832    MPI_Comm_size(writtenComm, &writtenCommSize);
833    if (compressedIndexToWriteOnServer.find(writtenCommSize) != compressedIndexToWriteOnServer.end())
834      return;
835
836    if (isCompressible())
837    {
838      size_t nbWritten = 0, indGlo;
839      CContext* context=CContext::getCurrent();     
840 
841      // We describe the distribution of client (server) on which data are written
842      std::vector<int> nBegin(1), nSize(1), nBeginGlobal(1), nGlob(1);
843      nBegin[0]       = 0;
844      nSize[0]        = n;
845      nBeginGlobal[0] = 0; 
846      nGlob[0]        = n_glo;
847      CDistributionServer srvDist(context->intraCommSize_, nBegin, nSize, nBeginGlobal, nGlob); 
848      const CArray<size_t,1>& writtenGlobalIndex  = srvDist.getGlobalIndex();
849      std::unordered_map<size_t,size_t>::const_iterator itb = globalLocalIndexMap_.begin(),
850                                                          ite = globalLocalIndexMap_.end(), it;   
851
852      CArray<size_t,1>::const_iterator itSrvb = writtenGlobalIndex.begin(),
853                                       itSrve = writtenGlobalIndex.end(), itSrv;
854      std::unordered_map<size_t,size_t> localGlobalIndexMap;
855      for (itSrv = itSrvb; itSrv != itSrve; ++itSrv)
856      {
857        indGlo = *itSrv;
858        if (ite != globalLocalIndexMap_.find(indGlo))
859        {
860          localGlobalIndexMap[localIndexToWriteOnServer(nbWritten)] = indGlo;
861          ++nbWritten;
862        }                 
863      }
864//
865//      nbWritten = 0;
866//      for (int idx = 0; idx < data_index.numElements(); ++idx)
867//      {
868//        if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
869//        {
870//          ++nbWritten;
871//        }
872//      }
873//
874//      compressedIndexToWriteOnServer[writtenCommSize].resize(nbWritten);
875//      nbWritten = 0;
876//      for (int idx = 0; idx < data_index.numElements(); ++idx)
877//      {
878//        if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
879//        {
880//          compressedIndexToWriteOnServer[writtenCommSize](nbWritten) = localGlobalIndexMap[data_index(idx)];
881//          ++nbWritten;
882//        }
883//      }
884
885      nbWritten = 0;
886      for (int idx = 0; idx < data_index.numElements(); ++idx)
887      {
888        if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
889        {
890          ++nbWritten;
891        }
892      }
893
894      compressedIndexToWriteOnServer[writtenCommSize].resize(nbWritten);
895      nbWritten = 0;
896      for (int idx = 0; idx < data_index.numElements(); ++idx)
897      {
898        if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx)))
899        {
900          compressedIndexToWriteOnServer[writtenCommSize](nbWritten) = localGlobalIndexMap[data_index(idx)];
901          ++nbWritten;
902        }
903      }
904
905      numberWrittenIndexes_[writtenCommSize] = nbWritten;
906
907      bool distributed_glo, distributed=isDistributed() ;
908      MPI_Allreduce(&distributed,&distributed_glo, 1, MPI_INT, MPI_LOR, writtenComm) ;
909      if (distributed_glo)
910      {
911             
912        MPI_Allreduce(&numberWrittenIndexes_[writtenCommSize], &totalNumberWrittenIndexes_[writtenCommSize], 1, MPI_INT, MPI_SUM, writtenComm);
913        MPI_Scan(&numberWrittenIndexes_[writtenCommSize], &offsetWrittenIndexes_[writtenCommSize], 1, MPI_INT, MPI_SUM, writtenComm);
914        offsetWrittenIndexes_[writtenCommSize] -= numberWrittenIndexes_[writtenCommSize];
915      }
916      else
917        totalNumberWrittenIndexes_[writtenCommSize] = numberWrittenIndexes_[writtenCommSize];
918    }
919  }
920  CATCH_DUMP_ATTR
921
922  /*!
923    Send distribution information from a group of client (client role) to another group of client (server role)
924    The distribution of a group of client (server role) is imposed by the group of client (client role)
925    \param [in] globalDim global dimension of grid containing this axis
926    \param [in] orderPositionInGrid the relative order of this axis in the grid (e.g grid composed of domain+axis -> orderPositionInGrid is 2)
927    \param [in] distType distribution type of the server. For now, we only have band distribution.
928  */
929  void CAxis::sendDistributionAttribute(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid,
930                                        CServerDistributionDescription::ServerDistributionType distType, const string& axisId)
931  TRY
932  {
933    string serverAxisId = axisId.empty() ? this->getId() : axisId ; 
934    int nbServer = client->serverSize;
935
936    CServerDistributionDescription serverDescription(globalDim, nbServer);
937    serverDescription.computeServerDistribution();
938
939    std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin();
940    std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes();
941
942    CEventClient event(getType(),EVENT_ID_DISTRIBUTION_ATTRIBUTE);
943    if (client->isServerLeader())
944    {
945      std::list<CMessage> msgs;
946
947      const std::list<int>& ranks = client->getRanksServerLeader();
948      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
949      {
950        // Use const int to ensure CMessage holds a copy of the value instead of just a reference
951        const int begin = serverIndexBegin[*itRank][orderPositionInGrid];
952        const int ni    = serverDimensionSizes[*itRank][orderPositionInGrid];
953
954        msgs.push_back(CMessage());
955        CMessage& msg = msgs.back();
956        msg << serverAxisId;
957        msg << ni << begin;
958        msg << isCompressible_;                   
959
960        event.push(*itRank,1,msg);
961      }
962      client->sendEvent(event);
963    }
964    else client->sendEvent(event);
965  }
966  CATCH_DUMP_ATTR
967
968  /*
969    Receive distribution attribute from another client
970    \param [in] event event containing data of these attributes
971  */
972  void CAxis::recvDistributionAttribute(CEventServer& event)
973  TRY
974  {
975    CBufferIn* buffer = event.subEvents.begin()->buffer;
976    string axisId;
977    *buffer >> axisId;
978    get(axisId)->recvDistributionAttribute(*buffer);
979  }
980  CATCH
981
982  /*
983    Receive distribution attribute from another client
984    \param [in] buffer buffer containing data of these attributes
985  */
986  void CAxis::recvDistributionAttribute(CBufferIn& buffer)
987  TRY
988  {
989    int ni_srv, begin_srv;
990    buffer >> ni_srv >> begin_srv;
991    buffer >> isCompressible_;           
992
993    // Set up new local size of axis on the receiving clients
994    n.setValue(ni_srv);
995    begin.setValue(begin_srv);
996  }
997  CATCH_DUMP_ATTR
998
999  /*
1000    Send attributes of axis from a group of client to other group of clients/servers
1001    on supposing that these attributes are not distributed among the sending group
1002    In the future, if new attributes are added, they should also be processed in this function
1003  */
1004  void CAxis::sendNonDistributedAttributes(CContextClient* client, const string& axisId)
1005  TRY
1006  {
1007    string serverAxisId = axisId.empty() ? this->getId() : axisId ; 
1008
1009    CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES);
1010    size_t nbIndex = index.numElements();
1011    size_t nbDataIndex = 0;
1012
1013    for (int idx = 0; idx < data_index.numElements(); ++idx)
1014    {
1015      int ind = data_index(idx);
1016      if (ind >= 0 && ind < nbIndex) ++nbDataIndex;
1017    }
1018
1019    CArray<int,1> dataIndex(nbDataIndex);
1020    nbDataIndex = 0;
1021    for (int idx = 0; idx < data_index.numElements(); ++idx)
1022    {
1023      int ind = data_index(idx);
1024      if (ind >= 0 && ind < nbIndex)
1025      {
1026        dataIndex(nbDataIndex) = ind;
1027        ++nbDataIndex;
1028      }
1029    }
1030
1031    if (client->isServerLeader())
1032    {
1033      std::list<CMessage> msgs;
1034
1035      const std::list<int>& ranks = client->getRanksServerLeader();
1036      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1037      {
1038        msgs.push_back(CMessage());
1039        CMessage& msg = msgs.back();
1040        msg << serverAxisId;
1041        msg << index.getValue() << dataIndex << mask.getValue();
1042        msg << hasValue;
1043        if (hasValue) msg << value.getValue();
1044        msg << hasBounds;
1045        if (hasBounds) msg << bounds.getValue();
1046        msg << hasLabel;
1047        if (hasLabel) msg << label.getValue();
1048
1049        event.push(*itRank, 1, msg);
1050      }
1051      client->sendEvent(event);
1052    }
1053    else client->sendEvent(event);
1054  }
1055  CATCH_DUMP_ATTR
1056
1057  /*
1058    Receive the non-distributed attributes from another group of clients
1059    \param [in] event event containing data of these attributes
1060  */
1061  void CAxis::recvNonDistributedAttributes(CEventServer& event)
1062  TRY
1063  {
1064    list<CEventServer::SSubEvent>::iterator it;
1065    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
1066    {
1067      CBufferIn* buffer = it->buffer;
1068      string axisId;
1069      *buffer >> axisId;
1070      get(axisId)->recvNonDistributedAttributes(it->rank, *buffer);
1071    }
1072  }
1073  CATCH
1074
1075  /*
1076    Receive the non-distributed attributes from another group of clients
1077    \param [in] rank rank of the sender
1078    \param [in] buffer buffer containing data sent from the sender
1079  */
1080  void CAxis::recvNonDistributedAttributes(int rank, CBufferIn& buffer)
1081  TRY
1082  { 
1083    CArray<int,1> tmp_index, tmp_data_index;
1084    CArray<bool,1> tmp_mask;
1085    CArray<double,1> tmp_val;
1086    CArray<double,2> tmp_bnds;
1087    CArray<string,1> tmp_label;
1088
1089    buffer >> tmp_index;
1090    index.reference(tmp_index);
1091    buffer >> tmp_data_index;
1092    data_index.reference(tmp_data_index);
1093    buffer >> tmp_mask;
1094    mask.reference(tmp_mask);
1095
1096    buffer >> hasValue;
1097    if (hasValue)
1098    {
1099      buffer >> tmp_val;
1100      value.reference(tmp_val);
1101    }
1102
1103    buffer >> hasBounds;
1104    if (hasBounds)
1105    {
1106      buffer >> tmp_bnds;
1107      bounds.reference(tmp_bnds);
1108    }
1109
1110    buffer >> hasLabel;
1111    if (hasLabel)
1112    {
1113      buffer >> tmp_label;
1114      label.reference(tmp_label);
1115    }
1116
1117    // Some value should be reset here
1118    data_begin.setValue(0);
1119    data_n.setValue(data_index.numElements());
1120    globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor()));
1121//    for (int idx = 0; idx < index.numElements(); ++idx) globalLocalIndexMap_[idx] = index(idx);
1122    for (int idx = 0; idx < index.numElements(); ++idx) globalLocalIndexMap_[index(idx)] = idx;
1123  }
1124  CATCH_DUMP_ATTR
1125
1126  /*
1127    Send axis attributes from a group of clients to another group of clients/servers
1128    supposing that these attributes are distributed among the clients of the sending group
1129    In future, if new attributes are added, they should also be processed in this function
1130  */
1131  void CAxis::sendDistributedAttributes_old(CContextClient* client, const string& axisId)
1132  TRY
1133  {
1134    string serverAxisId = axisId.empty() ? this->getId() : axisId ; 
1135   
1136    int ind, idx;
1137    int nbServer = client->serverSize;
1138
1139    CEventClient eventData(getType(), EVENT_ID_DISTRIBUTED_ATTRIBUTES);
1140
1141    list<CMessage> listData;
1142    list<CArray<int,1> > list_indi, list_dataInd;
1143    list<CArray<double,1> > list_val;
1144    list<CArray<double,2> > list_bounds;
1145    list<CArray<string,1> > list_label;
1146
1147    // Cut off the ghost points
1148    int nbIndex = index.numElements();
1149    CArray<int,1> dataIndex(nbIndex);
1150    dataIndex = -1;
1151    for (idx = 0; idx < data_index.numElements(); ++idx)
1152    {
1153      if (0 <= data_index(idx) && data_index(idx) < nbIndex)
1154        dataIndex(data_index(idx)) = 1;
1155    }
1156
1157    std::unordered_map<int, std::vector<size_t> >::const_iterator it, iteMap;
1158    iteMap = indSrv_[nbServer].end();
1159    for (int k = 0; k < connectedServerRank_[nbServer].size(); ++k)
1160    {
1161      int nbData = 0, nbDataCount = 0;
1162      int rank = connectedServerRank_[nbServer][k];
1163      it = indSrv_[nbServer].find(rank);
1164      if (iteMap != it)
1165        nbData = it->second.size();
1166
1167      list_indi.push_back(CArray<int,1>(nbData));
1168      list_dataInd.push_back(CArray<int,1>(nbData));
1169
1170      if (hasValue)
1171        list_val.push_back(CArray<double,1>(nbData));
1172
1173      if (hasBounds)       
1174        list_bounds.push_back(CArray<double,2>(2,nbData));
1175
1176      if (hasLabel)
1177        list_label.push_back(CArray<string,1>(nbData));
1178
1179      CArray<int,1>& indi = list_indi.back();
1180      CArray<int,1>& dataIndi = list_dataInd.back();
1181      dataIndi = -1;
1182
1183      for (int n = 0; n < nbData; ++n)
1184      {
1185        idx = static_cast<int>(it->second[n]);
1186        indi(n) = idx;
1187
1188        ind = globalLocalIndexMap_[idx];
1189        dataIndi(n) = dataIndex(ind);
1190
1191        if (hasValue)
1192        {
1193          CArray<double,1>& val = list_val.back();
1194          val(n) = value(ind);
1195        }
1196
1197        if (hasBounds)
1198        {
1199          CArray<double,2>& boundsVal = list_bounds.back();
1200          boundsVal(0, n) = bounds(0,ind);
1201          boundsVal(1, n) = bounds(1,ind);
1202        }
1203
1204        if (hasLabel)
1205        {
1206          CArray<string,1>& labelVal = list_label.back();
1207          labelVal(n) = label(ind); 
1208        }
1209      }
1210
1211      listData.push_back(CMessage());
1212      listData.back() << serverAxisId
1213                      << list_indi.back() << list_dataInd.back();
1214
1215      listData.back() << hasValue;
1216      if (hasValue)
1217        listData.back() << list_val.back();
1218
1219      listData.back() << hasBounds;
1220      if (hasBounds)
1221        listData.back() << list_bounds.back();
1222
1223      listData.back() << hasLabel;
1224      if (hasLabel)
1225        listData.back() << list_label.back();
1226
1227      eventData.push(rank, nbSenders[nbServer][rank], listData.back());
1228    }
1229
1230    client->sendEvent(eventData);
1231  }
1232  CATCH_DUMP_ATTR
1233
1234  /*
1235    Receive the distributed attributes from another group of clients
1236    \param [in] event event containing data of these attributes
1237  */
1238  void CAxis::recvDistributedAttributes_old(CEventServer& event)
1239  TRY
1240  {
1241    string axisId;
1242    vector<int> ranks;
1243    vector<CBufferIn*> buffers;
1244
1245    list<CEventServer::SSubEvent>::iterator it;
1246    for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it)
1247    {
1248      ranks.push_back(it->rank);
1249      CBufferIn* buffer = it->buffer;
1250      *buffer >> axisId;
1251      buffers.push_back(buffer);
1252    }
1253    get(axisId)->recvDistributedAttributes_old(ranks, buffers);
1254  }
1255  CATCH
1256
1257  /*
1258    Receive the non-distributed attributes from another group of clients
1259    \param [in] ranks rank of the sender
1260    \param [in] buffers buffer containing data sent from the sender
1261  */
1262  void CAxis::recvDistributedAttributes_old(vector<int>& ranks, vector<CBufferIn*> buffers)
1263  TRY
1264  {
1265    int nbReceived = ranks.size(), idx, ind, gloInd, locInd;
1266    vector<CArray<int,1> > vec_indi(nbReceived), vec_dataInd(nbReceived);
1267    vector<CArray<double,1> > vec_val(nbReceived);
1268    vector<CArray<double,2> > vec_bounds(nbReceived);
1269    vector<CArray<string,1> > vec_label(nbReceived);
1270   
1271    for (idx = 0; idx < nbReceived; ++idx)
1272    {     
1273      CBufferIn& buffer = *buffers[idx];
1274      buffer >> vec_indi[idx];
1275      buffer >> vec_dataInd[idx];     
1276
1277      buffer >> hasValue;
1278      if (hasValue)
1279        buffer >> vec_val[idx];
1280
1281      buffer >> hasBounds;
1282      if (hasBounds)
1283        buffer >> vec_bounds[idx];
1284
1285      buffer >> hasLabel;
1286      if (hasLabel)
1287        buffer >> vec_label[idx]; 
1288    }
1289
1290    // Estimate size of index array
1291    int nbIndexGlob = 0;
1292    for (idx = 0; idx < nbReceived; ++idx)
1293    {
1294      nbIndexGlob += vec_indi[idx].numElements();
1295    }
1296
1297    // Recompute global index
1298    // Take account of the overlapped index
1299    index.resize(nbIndexGlob);
1300    globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor()));
1301    nbIndexGlob = 0;
1302    int nbIndLoc = 0;
1303    for (idx = 0; idx < nbReceived; ++idx)
1304    {
1305      CArray<int,1>& tmp = vec_indi[idx];
1306      for (ind = 0; ind < tmp.numElements(); ++ind)
1307      {
1308         gloInd = tmp(ind);
1309         nbIndLoc = (gloInd % n_glo)-begin;
1310         if (0 == globalLocalIndexMap_.count(gloInd))
1311         {
1312           index(nbIndexGlob) = gloInd % n_glo;
1313           globalLocalIndexMap_[gloInd] = nbIndexGlob;
1314           ++nbIndexGlob;
1315         } 
1316      } 
1317    }
1318
1319    // Resize index to its real size
1320    if (nbIndexGlob==0) index.resize(nbIndexGlob) ;
1321    else index.resizeAndPreserve(nbIndexGlob);
1322
1323    int nbData = nbIndexGlob;
1324    CArray<int,1> nonCompressedData(nbData);
1325    nonCompressedData = -1;   
1326    // Mask is incorporated into data_index and is not sent/received anymore
1327    mask.reset();
1328    if (hasValue)
1329      value.resize(nbData);
1330    if (hasBounds)
1331      bounds.resize(2,nbData);
1332    if (hasLabel)
1333      label.resize(nbData);
1334
1335    nbData = 0;
1336    for (idx = 0; idx < nbReceived; ++idx)
1337    {
1338      CArray<int,1>& indi = vec_indi[idx];
1339      CArray<int,1>& dataIndi = vec_dataInd[idx];
1340      int nb = indi.numElements();
1341      for (int n = 0; n < nb; ++n)
1342      { 
1343        locInd = globalLocalIndexMap_[size_t(indi(n))];
1344
1345        nonCompressedData(locInd) = (-1 == nonCompressedData(locInd)) ? dataIndi(n) : nonCompressedData(locInd);
1346
1347        if (hasValue)
1348          value(locInd) = vec_val[idx](n);
1349
1350        if (hasBounds)
1351        {
1352          bounds(0,locInd) = vec_bounds[idx](0,n);
1353          bounds(1,locInd) = vec_bounds[idx](1,n);
1354        }
1355
1356        if (hasLabel)
1357          label(locInd) = vec_label[idx](n);
1358      }
1359    }
1360   
1361    int nbCompressedData = 0;
1362    for (idx = 0; idx < nonCompressedData.numElements(); ++idx)
1363    {
1364      if (0 <= nonCompressedData(idx))
1365        ++nbCompressedData;
1366    }
1367
1368    data_index.resize(nbCompressedData);
1369    nbCompressedData = 0;
1370    for (idx = 0; idx < nonCompressedData.numElements(); ++idx)
1371    {
1372      if (0 <= nonCompressedData(idx))
1373      {
1374        data_index(nbCompressedData) = idx % n;
1375        ++nbCompressedData;
1376      }
1377    }
1378
1379    data_begin.setValue(0);
1380    data_n.setValue(data_index.numElements());
1381  }
1382  CATCH_DUMP_ATTR
1383
1384  /*!
1385    Compare two axis objects.
1386    They are equal if only if they have identical attributes as well as their values.
1387    Moreover, they must have the same transformations.
1388  \param [in] axis Compared axis
1389  \return result of the comparison
1390  */
1391  bool CAxis::isEqual(CAxis* obj)
1392  TRY
1393  {
1394    vector<StdString> excludedAttr;
1395    excludedAttr.push_back("axis_ref");
1396
1397    bool objEqual = SuperClass::isEqual(obj, excludedAttr);   
1398    if (!objEqual) return objEqual;
1399
1400    TransMapTypes thisTrans = this->getAllTransformations();
1401    TransMapTypes objTrans  = obj->getAllTransformations();
1402
1403    TransMapTypes::const_iterator it, itb, ite;
1404    std::vector<ETranformationType> thisTransType, objTransType;
1405    for (it = thisTrans.begin(); it != thisTrans.end(); ++it)
1406      thisTransType.push_back(it->first);
1407    for (it = objTrans.begin(); it != objTrans.end(); ++it)
1408      objTransType.push_back(it->first);
1409
1410    if (thisTransType.size() != objTransType.size()) return false;
1411    for (int idx = 0; idx < thisTransType.size(); ++idx)
1412      objEqual &= (thisTransType[idx] == objTransType[idx]);
1413
1414    return objEqual;
1415  }
1416  CATCH_DUMP_ATTR
1417
1418  /*
1419    Add transformation into axis. This function only servers for Fortran interface
1420    \param [in] transType transformation type
1421    \param [in] id identifier of the transformation object
1422  */
1423  CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id)
1424  TRY
1425  {
1426    transformationMap_.push_back(std::make_pair(transType, CTransformation<CAxis>::createTransformation(transType,id)));
1427    return transformationMap_.back().second;
1428  }
1429  CATCH_DUMP_ATTR
1430
1431  /*
1432    Check whether an axis has (spatial) transformation
1433  */
1434  bool CAxis::hasTransformation()
1435  TRY
1436  {
1437    return (!transformationMap_.empty());
1438  }
1439  CATCH_DUMP_ATTR
1440
1441  /*
1442    Set transformation
1443    \param [in] axisTrans transformation to set
1444  */
1445  void CAxis::setTransformations(const TransMapTypes& axisTrans)
1446  TRY
1447  {
1448    transformationMap_ = axisTrans;
1449  }
1450  CATCH_DUMP_ATTR
1451
1452  /*
1453    Return all transformation held by the axis
1454    \return transformation the axis has
1455  */
1456  CAxis::TransMapTypes CAxis::getAllTransformations(void)
1457  TRY
1458  {
1459    return transformationMap_;
1460  }
1461  CATCH_DUMP_ATTR
1462
1463  /*
1464    Duplicate transformation of another axis
1465    \param [in] src axis whose transformations are copied
1466  */
1467  void CAxis::duplicateTransformation(CAxis* src)
1468  TRY
1469  {
1470    if (src->hasTransformation())
1471    {
1472      this->setTransformations(src->getAllTransformations());
1473    }
1474  }
1475  CATCH_DUMP_ATTR
1476
1477  /*!
1478   * Go through the hierarchy to find the axis from which the transformations must be inherited
1479   */
1480  void CAxis::solveInheritanceTransformation()
1481  TRY
1482  {
1483    if (hasTransformation() || !hasDirectAxisReference())
1484      return;
1485
1486    CAxis* axis = this;
1487    std::vector<CAxis*> refAxis;
1488    while (!axis->hasTransformation() && axis->hasDirectAxisReference())
1489    {
1490      refAxis.push_back(axis);
1491      axis = axis->getDirectAxisReference();
1492    }
1493
1494    if (axis->hasTransformation())
1495      for (size_t i = 0; i < refAxis.size(); ++i)
1496        refAxis[i]->setTransformations(axis->getAllTransformations());
1497  }
1498  CATCH_DUMP_ATTR
1499
1500  void CAxis::setContextClient(CContextClient* contextClient)
1501  TRY
1502  {
1503    if (clientsSet.find(contextClient)==clientsSet.end())
1504    {
1505      clients.push_back(contextClient) ;
1506      clientsSet.insert(contextClient);
1507    }
1508  }
1509  CATCH_DUMP_ATTR
1510
1511  void CAxis::parse(xml::CXMLNode & node)
1512  TRY
1513  {
1514    SuperClass::parse(node);
1515
1516    if (node.goToChildElement())
1517    {
1518      StdString nodeElementName;
1519      do
1520      {
1521        StdString nodeId("");
1522        if (node.getAttributes().end() != node.getAttributes().find("id"))
1523        { nodeId = node.getAttributes()["id"]; }
1524
1525        nodeElementName = node.getElementName();
1526        std::map<StdString, ETranformationType>::const_iterator ite = transformationMapList_.end(), it;
1527        it = transformationMapList_.find(nodeElementName);
1528        if (ite != it)
1529        {
1530          transformationMap_.push_back(std::make_pair(it->second, CTransformation<CAxis>::createTransformation(it->second,
1531                                                                                                               nodeId,
1532                                                                                                               &node)));
1533        }
1534        else
1535        {
1536          ERROR("void CAxis::parse(xml::CXMLNode & node)",
1537                << "The transformation " << nodeElementName << " has not been supported yet.");
1538        }
1539      } while (node.goToNextElement()) ;
1540      node.goToParentElement();
1541    }
1542  }
1543  CATCH_DUMP_ATTR
1544
1545
1546   //////////////////////////////////////////////////////////////////////////////////////
1547   //  this part is related to distribution, element definition, views and connectors  //
1548   //////////////////////////////////////////////////////////////////////////////////////
1549
1550   void CAxis::initializeLocalElement(void)
1551   {
1552      // after checkAttribute index of size n
1553      int rank = CContext::getCurrent()->getIntraCommRank() ;
1554     
1555      CArray<size_t,1> ind(n) ;
1556      for (int i=0;i<n;i++) ind(i)=index(i) ;
1557
1558      localElement_ = new CLocalElement(rank, n_glo, ind) ;
1559   }
1560
1561   void CAxis::addFullView(void)
1562   {
1563      CArray<int,1> index(n) ;
1564      for(int i=0; i<n ; i++) index(i)=i ;
1565      localElement_ -> addView(CElementView::FULL, index) ;
1566   }
1567
1568   void CAxis::addWorkflowView(void)
1569   {
1570     // mask + data are included into data_index
1571     int nk=data_index.numElements() ;
1572     int nMask=0 ;
1573     for(int k=0;k<nk;k++) if (data_index(k)>=0 && data_index(k)<n) nMask++ ;
1574     
1575     CArray<int,1> index(nMask) ;
1576     nMask=0 ;
1577     for(int k=0;k<nk;k++) 
1578       if (data_index(k)>=0 && data_index(k)<n) 
1579       {
1580         index(nMask) = data_index(k) ;
1581         nMask++ ;
1582       }
1583     localElement_ -> addView(CElementView::WORKFLOW, index) ;
1584   }
1585
1586   void CAxis::addModelView(void)
1587   {
1588     // information for model view is stored in data_index
1589     localElement_->addView(CElementView::MODEL, data_index) ;
1590   }
1591
1592   void CAxis::computeModelToWorkflowConnector(void)
1593   { 
1594     CLocalView* srcView=getLocalView(CElementView::MODEL) ;
1595     CLocalView* dstView=getLocalView(CElementView::WORKFLOW) ;
1596     modelToWorkflowConnector_ = new CLocalConnector(srcView, dstView); 
1597     modelToWorkflowConnector_->computeConnector() ;
1598   }
1599
1600
1601   void CAxis::computeRemoteElement(CContextClient* client, EDistributionType type)
1602  {
1603    CContext* context = CContext::getCurrent();
1604    map<int, CArray<size_t,1>> globalIndex ;
1605
1606    if (type==EDistributionType::BANDS) // Bands distribution to send to file server
1607    {
1608      int nbServer = client->serverSize;
1609      int nbClient = client->clientSize ;
1610      int rankClient = client->clientRank ;
1611      int size = nbServer / nbClient ;
1612      int start ;
1613      if (nbServer%nbClient > rankClient)
1614      {
1615       start = (size+1) * rankClient ;
1616       size++ ;
1617      }
1618      else start = size*rankClient + nbServer%nbClient ;
1619     
1620      for(int i=0; i<size; i++)
1621      { 
1622        int rank=start+i ; 
1623        size_t indSize = n_glo/nbServer ;
1624        size_t indStart ;
1625        if (n_glo % nbServer > rank)
1626        {
1627          indStart = (indSize+1) * rank ;
1628          indSize++ ;
1629        }
1630        else indStart = indSize*rank + n_glo%nbServer ;
1631       
1632        auto& globalInd =  globalIndex[rank] ;
1633        globalInd.resize(indSize) ;
1634        for(size_t n = 0 ; n<indSize; n++) globalInd(n)=indStart+n ;
1635      }
1636    }
1637    else if (type==EDistributionType::NONE) // domain is not distributed ie all servers get the same local domain
1638    {
1639      int nbServer = client->serverSize;
1640      size_t nglo=n_glo ;
1641      CArray<size_t,1> indGlo(nglo) ;
1642      for(size_t i=0;i<nglo;i++) indGlo(i) = i ;
1643      for (auto& rankServer : client->getRanksServerLeader()) globalIndex[rankServer].reference(indGlo.copy()); 
1644    }
1645    remoteElement_[client] = new CDistributedElement(n_glo, globalIndex) ;
1646    remoteElement_[client]->addFullView() ;
1647  }
1648 
1649  void CAxis::distributeToServer(CContextClient* client, std::map<int, CArray<size_t,1>>& globalIndex, const string& axisId)
1650  {
1651    string serverAxisId = axisId.empty() ? this->getId() : axisId ;
1652    CContext* context = CContext::getCurrent();
1653
1654    this->sendAllAttributesToServer(client, serverAxisId)  ;
1655
1656    CDistributedElement scatteredElement(n_glo,globalIndex) ;
1657    scatteredElement.addFullView() ;
1658    CScattererConnector scattererConnector(localElement_->getView(CElementView::FULL), scatteredElement.getView(CElementView::FULL), 
1659                                           context->getIntraComm(), client->getRemoteSize()) ;
1660    scattererConnector.computeConnector() ;
1661   
1662    // phase 0
1663    // send remote element to construct the full view on server, ie without hole
1664    CEventClient event0(getType(), EVENT_ID_AXIS_DISTRIBUTION);
1665    CMessage message0 ;
1666    message0<<serverAxisId<<0 ; 
1667    remoteElement_[client]->sendToServer(client,event0,message0) ; 
1668   
1669    // phase 1
1670    // send the full view of element to construct the connector which connect distributed data coming from client to the full local view
1671    CEventClient event1(getType(), EVENT_ID_AXIS_DISTRIBUTION);
1672    CMessage message1 ;
1673    message1<<serverAxisId<<1<<localElement_->getView(CElementView::FULL)->getGlobalSize() ; 
1674    scattererConnector.transfer(localElement_->getView(CElementView::FULL)->getGlobalIndex(),client,event1,message1) ;
1675
1676    sendDistributedAttributes(client, scattererConnector, axisId) ;
1677 
1678    // phase 2 send the mask : data index + mask2D
1679    CArray<bool,1> maskIn(localElement_->getView(CElementView::WORKFLOW)->getSize());
1680    CArray<bool,1> maskOut ;
1681    CLocalConnector workflowToFull(localElement_->getView(CElementView::WORKFLOW), localElement_->getView(CElementView::FULL)) ;
1682    workflowToFull.computeConnector() ;
1683    maskIn=true ;
1684    workflowToFull.transfer(maskIn,maskOut,false) ;
1685
1686    // phase 3 : prepare grid scatterer connector to send data from client to server
1687    map<int,CArray<size_t,1>> workflowGlobalIndex ;
1688    map<int,CArray<bool,1>> maskOut2 ; 
1689    scattererConnector.transfer(maskOut, maskOut2) ;
1690    scatteredElement.addView(CElementView::WORKFLOW, maskOut2) ;
1691    scatteredElement.getView(CElementView::WORKFLOW)->getGlobalIndexView(workflowGlobalIndex) ;
1692    // create new workflow view for scattered element
1693    CDistributedElement clientToServerElement(scatteredElement.getGlobalSize(), workflowGlobalIndex) ;
1694    clientToServerElement.addFullView() ;
1695    CEventClient event2(getType(), EVENT_ID_AXIS_DISTRIBUTION);
1696    CMessage message2 ;
1697    message2<<serverAxisId<<2 ; 
1698    clientToServerElement.sendToServer(client, event2, message2) ; 
1699    clientToServerConnector_[client] = new CScattererConnector(localElement_->getView(CElementView::WORKFLOW), clientToServerElement.getView(CElementView::FULL), 
1700                                                              context->getIntraComm(), client->getRemoteSize()) ;
1701    clientToServerConnector_[client]->computeConnector() ;
1702
1703    clientFromServerConnector_[client] = new CGathererConnector(clientToServerElement.getView(CElementView::FULL), localElement_->getView(CElementView::WORKFLOW));
1704    clientFromServerConnector_[client]->computeConnector() ;
1705
1706
1707  }
1708
1709  void CAxis::recvAxisDistribution(CEventServer& event)
1710  TRY
1711  {
1712    string axisId;
1713    int phasis ;
1714    for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> axisId >> phasis ;
1715    get(axisId)->receivedAxisDistribution(event, phasis);
1716  }
1717  CATCH
1718
1719
1720  void CAxis::receivedAxisDistribution(CEventServer& event, int phasis)
1721  TRY
1722  {
1723    CContext* context = CContext::getCurrent();
1724    if (phasis==0) // receive the remote element to construct the full view
1725    {
1726      localElement_ = new  CLocalElement(context->getIntraCommRank(),event) ;
1727      localElement_->addFullView() ;
1728      // construct the local dimension and indexes
1729      auto& globalIndex=localElement_->getGlobalIndex() ;
1730      int nk=globalIndex.numElements() ;
1731      int minK=n_glo,maxK=-1 ;
1732      int nGlo=n_glo ;
1733      int indGlo ;
1734      for(int k=0;k<nk;k++)
1735      {
1736        indGlo=globalIndex(k) ;
1737        if (indGlo<minK) minK=indGlo ;
1738        if (indGlo>maxK) maxK=indGlo ;
1739      } 
1740      if (maxK>=minK) { begin=minK ; n=maxK-minK+1 ; }
1741      else {begin=0; n=0 ;}
1742
1743    }
1744    else if (phasis==1) // receive the sent view from client to construct the full distributed full view on server
1745    {
1746      CContext* context = CContext::getCurrent();
1747      CDistributedElement* elementFrom = new  CDistributedElement(event) ;
1748      elementFrom->addFullView() ;
1749      gathererConnector_ = new CGathererConnector(elementFrom->getView(CElementView::FULL), localElement_->getView(CElementView::FULL)) ;
1750      gathererConnector_->computeConnector() ; 
1751    }
1752    else if (phasis==2)
1753    {
1754      delete gathererConnector_ ;
1755      elementFrom_ = new  CDistributedElement(event) ;
1756      elementFrom_->addFullView() ;
1757      gathererConnector_ =  new CGathererConnector(elementFrom_->getView(CElementView::FULL), localElement_->getView(CElementView::FULL)) ;
1758      gathererConnector_ -> computeConnector() ;
1759    }
1760 
1761  }
1762  CATCH
1763
1764  void CAxis::setServerMask(CArray<bool,1>& serverMask, CContextClient* client)
1765  TRY
1766  {
1767    CContext* context = CContext::getCurrent();
1768    localElement_->addView(CElementView::WORKFLOW, serverMask) ;
1769    mask.reference(serverMask.copy()) ;
1770 
1771    serverFromClientConnector_ = new CGathererConnector(elementFrom_->getView(CElementView::FULL), localElement_->getView(CElementView::WORKFLOW)) ;
1772    serverFromClientConnector_->computeConnector() ;
1773     
1774    serverToClientConnector_ = new CScattererConnector(localElement_->getView(CElementView::WORKFLOW), elementFrom_->getView(CElementView::FULL),
1775                                                         context->getIntraComm(), client->getRemoteSize()) ;
1776    serverToClientConnector_->computeConnector() ;
1777  }
1778  CATCH_DUMP_ATTR
1779
1780  void CAxis::sendDistributedAttributes(CContextClient* client, CScattererConnector& scattererConnector, const string& axisId)
1781  {
1782    string serverAxisId = axisId.empty() ? this->getId() : axisId ;
1783    CContext* context = CContext::getCurrent();
1784
1785    if (hasValue)
1786    {
1787      { // send level value
1788        CEventClient event(getType(), EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE);
1789        CMessage message ;
1790        message<<serverAxisId<<string("value") ; 
1791        scattererConnector.transfer(value, client, event,message) ;
1792      }
1793    }
1794
1795    if (hasBounds)
1796    {
1797      { // send bounds level value
1798        CEventClient event(getType(), EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE);
1799        CMessage message ;
1800        message<<serverAxisId<<string("bounds") ; 
1801        scattererConnector.transfer(2, bounds, client, event,message) ;
1802      }
1803    }
1804
1805    if (hasLabel)
1806    {
1807      { // send label
1808        CEventClient event(getType(), EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE);
1809        CMessage message ;
1810        message<<serverAxisId<<string("label") ;
1811        // something to do ? => convert string label into char ?
1812        //clientToServerConnector_[client]->transfer(2, bounds, client, event,message) ;
1813      }
1814    }
1815  }
1816
1817  void CAxis::recvDistributedAttributes(CEventServer& event)
1818  TRY
1819  {
1820    string axisId;
1821    string type ;
1822    for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> axisId >> type ;
1823    get(axisId)->recvDistributedAttributes(event, type);
1824  }
1825  CATCH
1826
1827  void CAxis::recvDistributedAttributes(CEventServer& event, const string& type)
1828  TRY
1829  {
1830    if (type=="value") 
1831    {
1832      gathererConnector_->transfer(event, value, 0.); 
1833    }
1834    else if (type=="bounds")
1835    {
1836      CArray<double,1> value ;
1837      gathererConnector_->transfer(event, 2, value, 0.); 
1838      bounds.resize(2,n) ;
1839      if (bounds.numElements() > 0 ) bounds=CArray<double,2>(bounds.dataFirst(),shape(2,n),neverDeleteData) ; 
1840    }
1841    else if (type=="label")
1842    {
1843       
1844    }
1845  }
1846  CATCH
1847
1848  DEFINE_REF_FUNC(Axis,axis)
1849
1850   ///---------------------------------------------------------------
1851
1852} // namespace xios
Note: See TracBrowser for help on using the repository browser.