source: XIOS/dev/dev_ym/XIOS_COUPLING/src/distribution/element.cpp @ 1960

Last change on this file since 1960 was 1930, checked in by ymipsl, 4 years ago

Big update on on going work related to data distribution and transfer between clients and servers.
Revisite of the source and store filter using "connectors".

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 4.7 KB
Line 
1#include "element.hpp"
2#include "distributed_view.hpp"
3#include "local_view.hpp"
4#include "local_connector.hpp"
5#include "context_client.hpp"
6#include "context_server.hpp"
7#include "mpi.hpp"
8
9
10namespace xios
11{
12
13  CDistributedElement::CDistributedElement(int globalSize, const map<int, CArray<size_t,1>>& globalIndex)
14  {
15    globalSize_= globalSize ;
16    for(auto index : globalIndex) 
17    {
18      globalIndex_[index.first].reference(index.second.copy()) ;
19      localSize_[index.first] = index.second.numElements() ;
20    }
21  }
22
23  CDistributedElement::CDistributedElement(CEventServer& event) 
24  {
25    recvFromClient(event) ;
26  }
27 
28  void CDistributedElement::addView(CElementView::type type, std::map<int, CArray<int,1>>& indexView)
29  {
30    views_[type] = new CDistributedView(this, type, indexView) ;
31  } 
32
33  void CDistributedElement::addView(CElementView::type type, std::map<int, CArray<bool,1>>& maskView)
34  {
35    views_[type] = new CDistributedView(this, type, maskView) ;
36  } 
37
38  void CDistributedElement::addFullView(void)
39  {
40    if (views_[CElementView::FULL]!=nullptr) return ;
41
42    std::map<int, CArray<int,1>> indexView ;
43    for(auto rank : globalIndex_)
44    {
45      auto& index = indexView[rank.first] ;
46      int size=rank.second.numElements() ;
47      index.resize(size) ;
48      for(int i=0;i<size;i++) index(i)=i ;
49    }
50    addView(CElementView::FULL, indexView) ;
51  } 
52
53  void CDistributedElement::sendToServer(CContextClient* client, CEventClient& event, const CMessage& messageHeader)
54  {
55    int remoteSize = client->getRemoteSize() ;
56    vector<int> nbSenders(remoteSize,0) ;
57    for(auto rank : globalIndex_) nbSenders[rank.first]=1 ; 
58    MPI_Allreduce(MPI_IN_PLACE, nbSenders.data(), remoteSize, MPI_INT, MPI_SUM, client->getIntraComm()) ;
59
60    list<CMessage> messages;
61    for(auto ranksData : globalIndex_)
62    {
63      int rank = ranksData.first ;
64      auto& data = ranksData.second ;
65
66      messages.push_back(CMessage(messageHeader));
67      messages.back()<<globalSize_<<data;
68      event.push(rank, nbSenders[rank], messages.back());
69    } 
70    client->sendEvent(event) ; 
71  }
72
73  void CDistributedElement::recvFromClient(CEventServer& event)
74  {
75    globalIndex_.clear();
76    for (auto& subEvent : event.subEvents)
77    {     
78      CBufferIn* buffer = subEvent.buffer;
79      int rank=subEvent.rank ;
80      *buffer>>globalSize_ ;
81      *buffer >> globalIndex_[rank];
82    }
83    localSize_.clear() ;
84    for(auto& globalIndex : globalIndex_) localSize_[globalIndex.first] = globalIndex.second.numElements() ; 
85  }
86 
87
88
89  CLocalElement::CLocalElement(int localRank, size_t globalSize, CArray<size_t,1>& globalIndex) 
90                             : CDistributedElement(globalSize, {{localRank, globalIndex}}),
91                               globalIndex_(CDistributedElement::globalIndex_[localRank]), localSize_(CDistributedElement::localSize_[localRank]), localRank_(localRank)
92  {
93   
94  }   
95
96  CLocalElement::CLocalElement(int localRank, CEventServer& event) : globalIndex_(CDistributedElement::globalIndex_[localRank]), localSize_(CDistributedElement::localSize_[localRank]), localRank_(localRank) 
97  {
98    recvFromClient(localRank, event) ;
99  }
100
101  void CLocalElement::recvFromClient(int localRank, CEventServer& event)
102  {
103    set<size_t> globalIndex ;
104
105    for (auto& subEvent : event.subEvents)
106    {     
107      CBufferIn* buffer = subEvent.buffer;
108      int rank=subEvent.rank ;
109      CArray<size_t,1> indGlo ;
110      *buffer >> globalSize_>> indGlo;
111      globalIndex.insert(indGlo.dataFirst(), indGlo.dataFirst()+indGlo.numElements()) ;
112    }
113
114    localSize_ = globalIndex.size() ;
115    globalIndex_.resize(localSize_) ;
116    int i=0 ;
117    for(auto& ind : globalIndex) { globalIndex_(i)=ind ; i++; }
118  }
119
120  void CLocalElement::addView(CElementView::type type, CArray<int,1>& indexView)
121  {
122    views_[type] = new CLocalView(this, type, indexView) ;
123  } 
124
125  void CLocalElement::addView(CElementView::type type, CArray<bool,1>& maskView)
126  {
127    views_[type] = new CLocalView(this, type, maskView) ;
128  } 
129
130  void CLocalElement::addFullView(void)
131  {
132    if (views_[CElementView::FULL]!=nullptr) return ;
133
134    CArray<int,1> indexView(localSize_) ;
135    for(int i=0;i<localSize_;i++) indexView(i)=i ;
136    addView(CElementView::FULL, indexView) ;
137  } 
138
139  CLocalConnector* CLocalElement::getConnector(CElementView::type srcType, CElementView::type dstType) 
140  { 
141    auto newPair = pair<CElementView::type,CElementView::type>(srcType,dstType);
142    auto it = connectors_.find(newPair) ;
143    if (it==connectors_.end()) 
144    {
145      auto insertPair=pair<pair<CElementView::type,CElementView::type>, CLocalConnector*>(newPair,new CLocalConnector(getView(srcType),getView(dstType))) ;
146      it=connectors_.insert(insertPair).first ;
147      it->second->computeConnector() ;
148    }
149    return it->second ;
150  }
151
152}
Note: See TracBrowser for help on using the repository browser.