source: XIOS/dev/dev_trunk_omp/src/filter/spatial_transform_filter.cpp @ 1646

Last change on this file since 1646 was 1646, checked in by yushan, 5 years ago

branch merged with trunk @1645. arch file (ep&mpi) added for ADA

File size: 14.6 KB
Line 
1#include "mpi.hpp"
2#include "spatial_transform_filter.hpp"
3#include "grid_transformation.hpp"
4#include "context.hpp"
5#include "context_client.hpp"
6#include "timer.hpp"
7#ifdef _usingEP
8using namespace ep_lib;
9#endif
10
11namespace xios
12{
13  CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, double outputValue, size_t inputSlotsCount)
14    : CFilter(gc, inputSlotsCount, engine), outputDefaultValue(outputValue)
15  { /* Nothing to do */ }
16
17  std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >
18  CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double missingValue)
19  {
20    if (!srcGrid || !destGrid)
21      ERROR("std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >"
22            "buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid)",
23            "Impossible to build the filter graph if either the source or the destination grid are null.");
24
25    std::shared_ptr<CSpatialTransformFilter> firstFilter, lastFilter;
26    // Note that this loop goes from the last transformation to the first transformation
27    do
28    {
29      CGridTransformation* gridTransformation = destGrid->getTransformations();
30      CSpatialTransformFilterEngine* engine = CSpatialTransformFilterEngine::get(destGrid->getTransformations());
31      const std::vector<StdString>& auxInputs = gridTransformation->getAuxInputs();
32      size_t inputCount = 1 + (auxInputs.empty() ? 0 : auxInputs.size());
33      double defaultValue  = (hasMissingValue) ? std::numeric_limits<double>::quiet_NaN() : 0.0;
34
35
36      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
37      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
38
39      bool isSpatialTemporal=false ;
40      for (it=algoList.begin();it!=algoList.end();++it)  if (it->second.first == TRANS_TEMPORAL_SPLITTING) isSpatialTemporal=true ;
41
42      std::shared_ptr<CSpatialTransformFilter> filter ;
43      if( isSpatialTemporal) filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTemporalFilter(gc, engine, gridTransformation, defaultValue, inputCount));
44      else filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount));
45
46     
47      if (!lastFilter)
48        lastFilter = filter;
49      else
50        filter->connectOutput(firstFilter, 0);
51
52      firstFilter = filter;
53      for (size_t idx = 0; idx < auxInputs.size(); ++idx)
54      {
55        CField* fieldAuxInput = CField::get(auxInputs[idx]);
56        fieldAuxInput->buildFilterGraph(gc, false);
57        fieldAuxInput->getInstantDataFilter()->connectOutput(firstFilter,idx+1);
58      }
59
60      destGrid = gridTransformation->getGridSource();
61    }
62    while (destGrid != srcGrid);
63
64    return std::make_pair(firstFilter, lastFilter);
65  }
66
67  void CSpatialTransformFilter::onInputReady(std::vector<CDataPacketPtr> data)
68  {
69    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
70    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue);
71    if (outputPacket)
72      onOutputReady(outputPacket);
73  }
74
75
76
77
78
79  CSpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount)
80    : CSpatialTransformFilter(gc, engine, outputValue, inputSlotsCount), record(0)
81  {
82      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
83      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
84
85      int pos ;
86      for (it=algoList.begin();it!=algoList.end();++it) 
87        if (it->second.first == TRANS_TEMPORAL_SPLITTING)
88        {
89          pos=it->first ;
90          if (pos < algoList.size()-1)
91            ERROR("SpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount))",
92                  "temporal splitting operation must be the last of whole transformation on same grid") ;
93        }
94         
95      CGrid* grid=gridTransformation->getGridDestination() ;
96
97      CAxis* axis = grid->getAxis(gridTransformation->getElementPositionInGridDst2AxisPosition().find(pos)->second) ;
98
99      nrecords = axis->index.numElements() ;
100  }
101
102
103  void CSpatialTemporalFilter::onInputReady(std::vector<CDataPacketPtr> data)
104  {
105    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
106    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue);
107
108    if (outputPacket)
109    {
110      size_t nelements=outputPacket->data.numElements() ;
111      if (!tmpData.numElements())
112      {
113        tmpData.resize(nelements);
114        tmpData=outputDefaultValue ;
115      }
116
117      nelements/=nrecords ;
118      size_t offset=nelements*record ;
119      for(size_t i=0;i<nelements;++i)  tmpData(i+offset) = outputPacket->data(i) ;
120   
121      record ++ ;
122      if (record==nrecords)
123      {
124        record=0 ;
125        CDataPacketPtr packet = CDataPacketPtr(new CDataPacket);
126        packet->date = data[0]->date;
127        packet->timestamp = data[0]->timestamp;
128        packet->status = data[0]->status;
129        packet->data.resize(tmpData.numElements());
130        packet->data = tmpData;
131        onOutputReady(packet);
132        tmpData.resize(0) ;
133      }
134    }
135  }
136
137
138  CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)
139    : gridTransformation(gridTransformation)
140  {
141    if (!gridTransformation)
142      ERROR("CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)",
143            "Impossible to construct a spatial transform filter engine without a valid grid transformation.");
144  }
145
146  std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> > *CSpatialTransformFilterEngine::engines_ptr = 0;
147
148  CSpatialTransformFilterEngine* CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)
149  {
150    if (!gridTransformation)
151      ERROR("CSpatialTransformFilterEngine& CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)",
152            "Impossible to get the requested engine, the grid transformation is invalid.");
153   
154    if(engines_ptr == NULL) engines_ptr = new std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >;
155
156
157    std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >::iterator it = engines_ptr->find(gridTransformation);
158    if (it == engines_ptr->end())
159    {
160      std::shared_ptr<CSpatialTransformFilterEngine> engine(new CSpatialTransformFilterEngine(gridTransformation));
161      it = engines_ptr->insert(std::make_pair(gridTransformation, engine)).first;
162    }
163
164    return it->second.get();
165  }
166
167  CDataPacketPtr CSpatialTransformFilterEngine::apply(std::vector<CDataPacketPtr> data)
168  {
169    /* Nothing to do */
170  }
171
172  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue)
173  {
174    CDataPacketPtr packet(new CDataPacket);
175    packet->date = data[0]->date;
176    packet->timestamp = data[0]->timestamp;
177    packet->status = data[0]->status;
178
179    if (packet->status == CDataPacket::NO_ERROR)
180    {
181      if (1 < data.size())  // Dynamical transformations
182      {
183        std::vector<CArray<double,1>* > dataAuxInputs(data.size()-1);
184        for (size_t idx = 0; idx < dataAuxInputs.size(); ++idx) dataAuxInputs[idx] = &(data[idx+1]->data);
185        gridTransformation->computeAll(dataAuxInputs, packet->timestamp);
186      }
187      packet->data.resize(gridTransformation->getGridDestination()->storeIndex_client.numElements());
188      if (0 != packet->data.numElements())
189        (packet->data)(0) = defaultValue;
190      apply(data[0]->data, packet->data);
191    }
192
193    return packet;
194  }
195
196  void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)
197  {
198    CTimer::get("CSpatialTransformFilterEngine::apply").resume(); 
199   
200    CContextClient* client = CContext::getCurrent()->client;
201    int rank;
202    MPI_Comm_rank (client->intraComm, &rank);
203
204    // Get default value for output data
205    bool ignoreMissingValue = false; 
206    double defaultValue = std::numeric_limits<double>::quiet_NaN();
207    if (0 != dataDest.numElements()) ignoreMissingValue = NumTraits<double>::isNan(dataDest(0));
208
209    const std::list<CGridTransformation::SendingIndexGridSourceMap>& listLocalIndexSend = gridTransformation->getLocalIndexToSendFromGridSource();
210    const std::list<CGridTransformation::RecvIndexGridDestinationMap>& listLocalIndexToReceive = gridTransformation->getLocalIndexToReceiveOnGridDest();
211    const std::list<size_t>& listNbLocalIndexToReceive = gridTransformation->getNbLocalIndexToReceiveOnGridDest();
212    const std::vector<CGenericAlgorithmTransformation*>& listAlgos = gridTransformation->getAlgos();
213
214    CArray<double,1> dataCurrentDest(dataSrc.copy());
215
216    std::list<CGridTransformation::SendingIndexGridSourceMap>::const_iterator itListSend  = listLocalIndexSend.begin(),
217                                                                              iteListSend = listLocalIndexSend.end();
218    std::list<CGridTransformation::RecvIndexGridDestinationMap>::const_iterator itListRecv = listLocalIndexToReceive.begin();
219    std::list<size_t>::const_iterator itNbListRecv = listNbLocalIndexToReceive.begin();
220    std::vector<CGenericAlgorithmTransformation*>::const_iterator itAlgo = listAlgos.begin();
221
222    for (; itListSend != iteListSend; ++itListSend, ++itListRecv, ++itNbListRecv, ++itAlgo)
223    {
224      CArray<double,1> dataCurrentSrc(dataCurrentDest);
225      const CGridTransformation::SendingIndexGridSourceMap& localIndexToSend = *itListSend;
226
227      // Sending data from field sources to do transformations
228      std::map<int, CArray<int,1> >::const_iterator itbSend = localIndexToSend.begin(), itSend,
229                                                    iteSend = localIndexToSend.end();
230      int idxSendBuff = 0;
231      std::vector<double*> sendBuff(localIndexToSend.size());
232      double* sendBuffRank;
233      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
234      {
235        int destRank = itSend->first;
236        if (0 != itSend->second.numElements())
237        {
238          if (rank != itSend->first)
239            sendBuff[idxSendBuff] = new double[itSend->second.numElements()];
240          else
241            sendBuffRank = new double[itSend->second.numElements()];
242        }
243      }
244
245      idxSendBuff = 0;
246      std::vector<MPI_Request> sendRecvRequest(localIndexToSend.size() + itListRecv->size());
247      int position = 0;
248      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
249      {
250        int destRank = itSend->first;
251        const CArray<int,1>& localIndex_p = itSend->second;
252        int countSize = localIndex_p.numElements();
253        if (destRank != rank)
254        {
255          for (int idx = 0; idx < countSize; ++idx)
256          {
257            sendBuff[idxSendBuff][idx] = dataCurrentSrc(localIndex_p(idx));
258          } 
259          MPI_Isend(sendBuff[idxSendBuff], countSize, MPI_DOUBLE, destRank, 12, client->intraComm, &sendRecvRequest[position++]);
260         
261        }
262        else
263        {
264          for (int idx = 0; idx < countSize; ++idx)
265          {
266            sendBuffRank[idx] = dataCurrentSrc(localIndex_p(idx));
267          }
268        }
269      }
270
271      // Receiving data on destination fields
272      const CGridTransformation::RecvIndexGridDestinationMap& localIndexToReceive = *itListRecv;
273      CGridTransformation::RecvIndexGridDestinationMap::const_iterator itbRecv = localIndexToReceive.begin(), itRecv,
274                                                                       iteRecv = localIndexToReceive.end();
275      int recvBuffSize = 0;
276      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
277      {
278        if (itRecv->first != rank )
279          recvBuffSize += itRecv->second.size();
280      }
281      //(recvBuffSize < itRecv->second.size()) ? itRecv->second.size() : recvBuffSize;
282      double* recvBuff;
283
284      if (0 != recvBuffSize) recvBuff = new double[recvBuffSize];
285      int currentBuff = 0;
286      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
287      {
288        int srcRank = itRecv->first;
289        if (srcRank != rank)
290        {
291          int countSize = itRecv->second.size();
292          MPI_Irecv(recvBuff + currentBuff, countSize, MPI_DOUBLE, srcRank, 12, client->intraComm, &sendRecvRequest[position++]);
293          currentBuff += countSize;
294        }
295      }
296      std::vector<MPI_Status> status(sendRecvRequest.size());
297      MPI_Waitall(position, &sendRecvRequest[0], &status[0]);
298
299
300
301      dataCurrentDest.resize(*itNbListRecv);
302      dataCurrentDest = 0.0;
303
304      std::vector<bool> localInitFlag(dataCurrentDest.numElements(), true);
305      currentBuff = 0;
306      bool firstPass=true; 
307      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
308      {
309        const std::vector<std::pair<int,double> >& localIndex_p = itRecv->second;
310        int srcRank = itRecv->first;
311        if (srcRank != rank)
312        {
313          int countSize = itRecv->second.size();
314          (*itAlgo)->apply(localIndex_p,
315                           recvBuff+currentBuff,
316                           dataCurrentDest,
317                           localInitFlag,
318                           ignoreMissingValue,firstPass);
319          currentBuff += countSize;
320        }
321        else
322        {
323          (*itAlgo)->apply(localIndex_p,
324                           sendBuffRank,
325                           dataCurrentDest,
326                           localInitFlag,
327                           ignoreMissingValue,firstPass);
328        }
329
330        firstPass=false ;
331      }
332
333      (*itAlgo)->updateData(dataCurrentDest);
334
335      idxSendBuff = 0;
336      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
337      {
338        if (0 != itSend->second.numElements())
339        {
340          if (rank != itSend->first)
341            delete [] sendBuff[idxSendBuff];
342          else
343            delete [] sendBuffRank;
344        }
345      }
346      if (0 != recvBuffSize) delete [] recvBuff;
347    }
348    if (dataCurrentDest.numElements() != dataDest.numElements())
349    ERROR("CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)",
350          "Incoherent between the received size and expected size. " << std::endl
351          << "Expected size: " << dataDest.numElements() << std::endl
352          << "Received size: " << dataCurrentDest.numElements());
353
354    dataDest = dataCurrentDest;
355
356    CTimer::get("CSpatialTransformFilterEngine::apply").suspend() ;
357  }
358} // namespace xios
Note: See TracBrowser for help on using the repository browser.