source: XIOS/dev/dev_trunk_omp/src/filter/spatial_transform_filter.cpp @ 1661

Last change on this file since 1661 was 1661, checked in by yushan, 2 years ago

MARK: branch merged with trunk @1660. Test (test_complete, test_remap) on ADA with IntelMPI and _usingEP/_usingMPI as switch.

File size: 14.6 KB
Line 
1#include "mpi.hpp"
2#include "spatial_transform_filter.hpp"
3#include "grid_transformation.hpp"
4#include "context.hpp"
5#include "context_client.hpp"
6#include "timer.hpp"
7#ifdef _usingEP
8using namespace ep_lib;
9#endif
10
11namespace xios
12{
13  CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, double outputValue, size_t inputSlotsCount)
14    : CFilter(gc, inputSlotsCount, engine), outputDefaultValue(outputValue)
15  { /* Nothing to do */ }
16
17  std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >
18  CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double missingValue)
19  {
20    if (!srcGrid || !destGrid)
21      ERROR("std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >"
22            "buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid)",
23            "Impossible to build the filter graph if either the source or the destination grid are null.");
24
25    std::shared_ptr<CSpatialTransformFilter> firstFilter, lastFilter;
26    // Note that this loop goes from the last transformation to the first transformation
27    do
28    {
29      CGridTransformation* gridTransformation = destGrid->getTransformations();
30      CSpatialTransformFilterEngine* engine = CSpatialTransformFilterEngine::get(destGrid->getTransformations());
31      const std::vector<StdString>& auxInputs = gridTransformation->getAuxInputs();
32      size_t inputCount = 1 + (auxInputs.empty() ? 0 : auxInputs.size());
33      double defaultValue  = (hasMissingValue) ? std::numeric_limits<double>::quiet_NaN() : 0.0;
34
35
36      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
37      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
38
39      bool isSpatialTemporal=false ;
40      for (it=algoList.begin();it!=algoList.end();++it)  if (it->second.first == TRANS_TEMPORAL_SPLITTING) isSpatialTemporal=true ;
41
42      std::shared_ptr<CSpatialTransformFilter> filter ;
43      if( isSpatialTemporal) filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTemporalFilter(gc, engine, gridTransformation, defaultValue, inputCount));
44      else filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount));
45
46     
47      if (!lastFilter)
48        lastFilter = filter;
49      else
50        filter->connectOutput(firstFilter, 0);
51
52      firstFilter = filter;
53      for (size_t idx = 0; idx < auxInputs.size(); ++idx)
54      {
55        CField* fieldAuxInput = CField::get(auxInputs[idx]);
56        fieldAuxInput->buildFilterGraph(gc, false);
57        fieldAuxInput->getInstantDataFilter()->connectOutput(firstFilter,idx+1);
58      }
59
60      destGrid = gridTransformation->getGridSource();
61    }
62    while (destGrid != srcGrid);
63
64    return std::make_pair(firstFilter, lastFilter);
65  }
66
67  void CSpatialTransformFilter::onInputReady(std::vector<CDataPacketPtr> data)
68  {
69    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
70    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue);
71    if (outputPacket)
72      onOutputReady(outputPacket);
73  }
74
75  CSpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount)
76    : CSpatialTransformFilter(gc, engine, outputValue, inputSlotsCount), record(0)
77  {
78      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
79      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
80
81      int pos ;
82      for (it=algoList.begin();it!=algoList.end();++it) 
83        if (it->second.first == TRANS_TEMPORAL_SPLITTING)
84        {
85          pos=it->first ;
86          if (pos < algoList.size()-1)
87            ERROR("SpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount))",
88                  "temporal splitting operation must be the last of whole transformation on same grid") ;
89        }
90         
91      CGrid* grid=gridTransformation->getGridDestination() ;
92
93      CAxis* axis = grid->getAxis(gridTransformation->getElementPositionInGridDst2AxisPosition().find(pos)->second) ;
94
95      nrecords = axis->index.numElements() ;
96  }
97
98
99  void CSpatialTemporalFilter::onInputReady(std::vector<CDataPacketPtr> data)
100  {
101    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
102    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue);
103
104    if (outputPacket)
105    {
106      size_t nelements=outputPacket->data.numElements() ;
107      if (!tmpData.numElements())
108      {
109        tmpData.resize(nelements);
110        tmpData=outputDefaultValue ;
111      }
112
113      nelements/=nrecords ;
114      size_t offset=nelements*record ;
115      for(size_t i=0;i<nelements;++i)  tmpData(i+offset) = outputPacket->data(i) ;
116   
117      record ++ ;
118      if (record==nrecords)
119      {
120        record=0 ;
121        CDataPacketPtr packet = CDataPacketPtr(new CDataPacket);
122        packet->date = data[0]->date;
123        packet->timestamp = data[0]->timestamp;
124        packet->status = data[0]->status;
125        packet->data.resize(tmpData.numElements());
126        packet->data = tmpData;
127        onOutputReady(packet);
128        tmpData.resize(0) ;
129      }
130    }
131  }
132
133
134  CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)
135    : gridTransformation(gridTransformation)
136  {
137    if (!gridTransformation)
138      ERROR("CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)",
139            "Impossible to construct a spatial transform filter engine without a valid grid transformation.");
140  }
141
142  std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> > *CSpatialTransformFilterEngine::engines_ptr = 0;
143
144  CSpatialTransformFilterEngine* CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)
145  {
146    if (!gridTransformation)
147      ERROR("CSpatialTransformFilterEngine& CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)",
148            "Impossible to get the requested engine, the grid transformation is invalid.");
149   
150    if(engines_ptr == NULL) engines_ptr = new std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >;
151
152
153    std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >::iterator it = engines_ptr->find(gridTransformation);
154    if (it == engines_ptr->end())
155    {
156      std::shared_ptr<CSpatialTransformFilterEngine> engine(new CSpatialTransformFilterEngine(gridTransformation));
157      it = engines_ptr->insert(std::make_pair(gridTransformation, engine)).first;
158    }
159
160    return it->second.get();
161  }
162
163  CDataPacketPtr CSpatialTransformFilterEngine::apply(std::vector<CDataPacketPtr> data)
164  {
165    /* Nothing to do */
166  }
167
168  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue)
169  {
170    CDataPacketPtr packet(new CDataPacket);
171    packet->date = data[0]->date;
172    packet->timestamp = data[0]->timestamp;
173    packet->status = data[0]->status;
174
175    if (packet->status == CDataPacket::NO_ERROR)
176    {
177      if (1 < data.size())  // Dynamical transformations
178      {
179        std::vector<CArray<double,1>* > dataAuxInputs(data.size()-1);
180        for (size_t idx = 0; idx < dataAuxInputs.size(); ++idx) dataAuxInputs[idx] = &(data[idx+1]->data);
181        gridTransformation->computeAll(dataAuxInputs, packet->timestamp);
182      }
183      packet->data.resize(gridTransformation->getGridDestination()->storeIndex_client.numElements());
184      if (0 != packet->data.numElements())
185        (packet->data)(0) = defaultValue;
186      apply(data[0]->data, packet->data);
187    }
188
189    return packet;
190  }
191
192  void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)
193  {
194    CTimer::get("CSpatialTransformFilterEngine::apply").resume(); 
195   
196    CContextClient* client = CContext::getCurrent()->client;
197    int rank;
198    MPI_Comm_rank (client->intraComm, &rank);
199
200    // Get default value for output data
201    bool ignoreMissingValue = false; 
202    double defaultValue = std::numeric_limits<double>::quiet_NaN();
203    if (0 != dataDest.numElements()) ignoreMissingValue = NumTraits<double>::isNan(dataDest(0));
204
205    const std::list<CGridTransformation::SendingIndexGridSourceMap>& listLocalIndexSend = gridTransformation->getLocalIndexToSendFromGridSource();
206    const std::list<CGridTransformation::RecvIndexGridDestinationMap>& listLocalIndexToReceive = gridTransformation->getLocalIndexToReceiveOnGridDest();
207    const std::list<size_t>& listNbLocalIndexToReceive = gridTransformation->getNbLocalIndexToReceiveOnGridDest();
208    const std::vector<CGenericAlgorithmTransformation*>& listAlgos = gridTransformation->getAlgos();
209
210    CArray<double,1> dataCurrentDest(dataSrc.copy());
211
212    std::list<CGridTransformation::SendingIndexGridSourceMap>::const_iterator itListSend  = listLocalIndexSend.begin(),
213                                                                              iteListSend = listLocalIndexSend.end();
214    std::list<CGridTransformation::RecvIndexGridDestinationMap>::const_iterator itListRecv = listLocalIndexToReceive.begin();
215    std::list<size_t>::const_iterator itNbListRecv = listNbLocalIndexToReceive.begin();
216    std::vector<CGenericAlgorithmTransformation*>::const_iterator itAlgo = listAlgos.begin();
217
218    for (; itListSend != iteListSend; ++itListSend, ++itListRecv, ++itNbListRecv, ++itAlgo)
219    {
220      CArray<double,1> dataCurrentSrc(dataCurrentDest);
221      const CGridTransformation::SendingIndexGridSourceMap& localIndexToSend = *itListSend;
222
223      // Sending data from field sources to do transformations
224      std::map<int, CArray<int,1> >::const_iterator itbSend = localIndexToSend.begin(), itSend,
225                                                    iteSend = localIndexToSend.end();
226      int idxSendBuff = 0;
227      std::vector<double*> sendBuff(localIndexToSend.size());
228      double* sendBuffRank;
229      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
230      {
231        int destRank = itSend->first;
232        if (0 != itSend->second.numElements())
233        {
234          if (rank != itSend->first)
235            sendBuff[idxSendBuff] = new double[itSend->second.numElements()];
236          else
237            sendBuffRank = new double[itSend->second.numElements()];
238        }
239      }
240
241      idxSendBuff = 0;
242      std::vector<MPI_Request> sendRecvRequest(localIndexToSend.size() + itListRecv->size());
243      int position = 0;
244      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
245      {
246        int destRank = itSend->first;
247        const CArray<int,1>& localIndex_p = itSend->second;
248        int countSize = localIndex_p.numElements();
249        if (destRank != rank)
250        {
251          for (int idx = 0; idx < countSize; ++idx)
252          {
253            sendBuff[idxSendBuff][idx] = dataCurrentSrc(localIndex_p(idx));
254          } 
255          MPI_Isend(sendBuff[idxSendBuff], countSize, MPI_DOUBLE, destRank, 12, client->intraComm, &sendRecvRequest[position++]);
256         
257        }
258        else
259        {
260          for (int idx = 0; idx < countSize; ++idx)
261          {
262            sendBuffRank[idx] = dataCurrentSrc(localIndex_p(idx));
263          }
264        }
265      }
266
267      // Receiving data on destination fields
268      const CGridTransformation::RecvIndexGridDestinationMap& localIndexToReceive = *itListRecv;
269      CGridTransformation::RecvIndexGridDestinationMap::const_iterator itbRecv = localIndexToReceive.begin(), itRecv,
270                                                                       iteRecv = localIndexToReceive.end();
271      int recvBuffSize = 0;
272      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
273      {
274        if (itRecv->first != rank )
275          recvBuffSize += itRecv->second.size();
276      }
277      //(recvBuffSize < itRecv->second.size()) ? itRecv->second.size() : recvBuffSize;
278      double* recvBuff;
279
280      if (0 != recvBuffSize) recvBuff = new double[recvBuffSize];
281      int currentBuff = 0;
282      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
283      {
284        int srcRank = itRecv->first;
285        if (srcRank != rank)
286        {
287          int countSize = itRecv->second.size();
288          MPI_Irecv(recvBuff + currentBuff, countSize, MPI_DOUBLE, srcRank, 12, client->intraComm, &sendRecvRequest[position++]);
289          currentBuff += countSize;
290        }
291      }
292      std::vector<MPI_Status> status(sendRecvRequest.size());
293      MPI_Waitall(position, &sendRecvRequest[0], &status[0]);
294
295
296
297      dataCurrentDest.resize(*itNbListRecv);
298      dataCurrentDest = 0.0;
299
300      std::vector<bool> localInitFlag(dataCurrentDest.numElements(), true);
301      currentBuff = 0;
302      bool firstPass=true; 
303      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
304      {
305        const std::vector<std::pair<int,double> >& localIndex_p = itRecv->second;
306        int srcRank = itRecv->first;
307        if (srcRank != rank)
308        {
309          int countSize = itRecv->second.size();
310          (*itAlgo)->apply(localIndex_p,
311                           recvBuff+currentBuff,
312                           dataCurrentDest,
313                           localInitFlag,
314                           ignoreMissingValue,firstPass);
315          currentBuff += countSize;
316        }
317        else
318        {
319          (*itAlgo)->apply(localIndex_p,
320                           sendBuffRank,
321                           dataCurrentDest,
322                           localInitFlag,
323                           ignoreMissingValue,firstPass);
324        }
325
326        firstPass=false ;
327      }
328
329      (*itAlgo)->updateData(dataCurrentDest);
330
331      idxSendBuff = 0;
332      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
333      {
334        if (0 != itSend->second.numElements())
335        {
336          if (rank != itSend->first)
337            delete [] sendBuff[idxSendBuff];
338          else
339            delete [] sendBuffRank;
340        }
341      }
342      if (0 != recvBuffSize) delete [] recvBuff;
343    }
344    if (dataCurrentDest.numElements() != dataDest.numElements())
345    ERROR("CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)",
346          "Incoherent between the received size and expected size. " << std::endl
347          << "Expected size: " << dataDest.numElements() << std::endl
348          << "Received size: " << dataCurrentDest.numElements());
349
350    dataDest = dataCurrentDest;
351
352    CTimer::get("CSpatialTransformFilterEngine::apply").suspend() ;
353  }
354} // namespace xios
Note: See TracBrowser for help on using the repository browser.