source: XIOS/dev/dev_trunk_omp/src/filter/spatial_transform_filter.cpp @ 1677

Last change on this file since 1677 was 1677, checked in by yushan, 5 years ago

MARK: Dynamic workflow graph developement. Branch up to date with trunk @1663.

File size: 16.2 KB
Line 
1#include "mpi.hpp"
2#include "spatial_transform_filter.hpp"
3#include "grid_transformation.hpp"
4#include "context.hpp"
5#include "context_client.hpp"
6#include "timer.hpp"
7#ifdef _usingEP
8using namespace ep_lib;
9#endif
10#include "workflow_graph.hpp"
11namespace xios
12{
13  CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine,
14                                                   double outputValue, size_t inputSlotsCount, bool buildWorkflowGraph /*= false*/)
15    : CFilter(gc, inputSlotsCount, engine, buildWorkflowGraph), outputDefaultValue(outputValue)
16  { /* Nothing to do */ }
17
18  std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >
19  CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double missingValue,
20                                            bool buildWorkflowGraph)
21  {
22    if (!srcGrid || !destGrid)
23      ERROR("std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >"
24            "buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid)",
25            "Impossible to build the filter graph if either the source or the destination grid are null.");
26
27    std::shared_ptr<CSpatialTransformFilter> firstFilter, lastFilter;
28    // Note that this loop goes from the last transformation to the first transformation
29    do
30    {
31      CGridTransformation* gridTransformation = destGrid->getTransformations();
32      CSpatialTransformFilterEngine* engine = CSpatialTransformFilterEngine::get(destGrid->getTransformations());
33      const std::vector<StdString>& auxInputs = gridTransformation->getAuxInputs();
34      size_t inputCount = 1 + (auxInputs.empty() ? 0 : auxInputs.size());
35      double defaultValue  = (hasMissingValue) ? std::numeric_limits<double>::quiet_NaN() : 0.0;
36
37
38      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
39      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
40
41      bool isSpatialTemporal=false ;
42      for (it=algoList.begin();it!=algoList.end();++it)  if (it->second.first == TRANS_TEMPORAL_SPLITTING) isSpatialTemporal=true ;
43
44      std::shared_ptr<CSpatialTransformFilter> filter ;
45      if( isSpatialTemporal)
46        filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTemporalFilter(gc, engine, gridTransformation, defaultValue, inputCount, buildWorkflowGraph));
47      else
48        filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount, buildWorkflowGraph));
49
50     
51      if (!lastFilter)
52        lastFilter = filter;
53      else
54      {
55        filter->connectOutput(firstFilter, 0);
56      }
57
58      firstFilter = filter;
59      for (size_t idx = 0; idx < auxInputs.size(); ++idx)
60      {
61        CField* fieldAuxInput = CField::get(auxInputs[idx]);
62        fieldAuxInput->buildFilterGraph(gc, false);
63        fieldAuxInput->getInstantDataFilter()->connectOutput(firstFilter,idx+1);
64      }
65
66      destGrid = gridTransformation->getGridSource();
67    }
68    while (destGrid != srcGrid);
69
70    return std::make_pair(firstFilter, lastFilter);
71  }
72
73  void CSpatialTransformFilter::onInputReady(std::vector<CDataPacketPtr> data)
74  {
75    // if(CXios::isClient) std::cout<<"CSpatialTransformFilter onInputReady"<<std::endl;
76
77    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
78    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->output_field_id);
79    if (outputPacket)
80    {
81      // std::cout<<"Spatial Transform Filter onOutputReady"<<std::endl;
82      onOutputReady(outputPacket);
83    } 
84  }
85
86  CSpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine,
87                                                  CGridTransformation* gridTransformation, double outputValue,
88                                                  size_t inputSlotsCount, bool buildWorkflowGraph)
89    : CSpatialTransformFilter(gc, engine, outputValue, inputSlotsCount, buildWorkflowGraph), record(0)
90  {
91      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
92      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
93
94      int pos ;
95      for (it=algoList.begin();it!=algoList.end();++it) 
96        if (it->second.first == TRANS_TEMPORAL_SPLITTING)
97        {
98          pos=it->first ;
99          if (pos < algoList.size()-1)
100            ERROR("SpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount))",
101                  "temporal splitting operation must be the last of whole transformation on same grid") ;
102        }
103         
104      CGrid* grid=gridTransformation->getGridDestination() ;
105
106      CAxis* axis = grid->getAxis(gridTransformation->getElementPositionInGridDst2AxisPosition().find(pos)->second) ;
107
108      nrecords = axis->index.numElements() ;
109  }
110
111
112  void CSpatialTemporalFilter::onInputReady(std::vector<CDataPacketPtr> data)
113  {
114    if(CXios::isClient) std::cout<<"CSpatialTemporalFilter onInputReady"<<std::endl;
115
116    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
117    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->output_field_id);
118
119    if (outputPacket)
120    {
121      size_t nelements=outputPacket->data.numElements() ;
122      if (!tmpData.numElements())
123      {
124        tmpData.resize(nelements);
125        tmpData=outputDefaultValue ;
126      }
127
128      nelements/=nrecords ;
129      size_t offset=nelements*record ;
130      for(size_t i=0;i<nelements;++i)  tmpData(i+offset) = outputPacket->data(i) ;
131   
132      record ++ ;
133      if (record==nrecords)
134      {
135        record=0 ;
136        CDataPacketPtr packet = CDataPacketPtr(new CDataPacket);
137        packet->date = data[0]->date;
138        packet->timestamp = data[0]->timestamp;
139        packet->status = data[0]->status;
140        packet->data.resize(tmpData.numElements());
141        packet->data = tmpData;
142        std::cout<<"Spatial temporal filter onOutputReady"<<std::endl;
143        onOutputReady(packet);
144        tmpData.resize(0) ;
145      }
146    }
147  }
148
149
150  CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)
151    : gridTransformation(gridTransformation)
152  {
153    if (!gridTransformation)
154      ERROR("CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)",
155            "Impossible to construct a spatial transform filter engine without a valid grid transformation.");
156  }
157
158  std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> > *CSpatialTransformFilterEngine::engines_ptr = 0;
159
160  CSpatialTransformFilterEngine* CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)
161  {
162    if (!gridTransformation)
163      ERROR("CSpatialTransformFilterEngine& CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)",
164            "Impossible to get the requested engine, the grid transformation is invalid.");
165   
166    if(engines_ptr == NULL) engines_ptr = new std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >;
167
168
169    std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >::iterator it = engines_ptr->find(gridTransformation);
170    if (it == engines_ptr->end())
171    {
172      std::shared_ptr<CSpatialTransformFilterEngine> engine(new CSpatialTransformFilterEngine(gridTransformation));
173      it = engines_ptr->insert(std::make_pair(gridTransformation, engine)).first;
174    }
175
176    return it->second.get();
177  }
178
179  CDataPacketPtr CSpatialTransformFilterEngine::apply(std::vector<CDataPacketPtr> data)
180  {
181    /* Nothing to do */
182  }
183
184  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue, int tag, StdString fieldID)
185  {
186    if(tag)
187    {
188      this->filterID = InvalidableObject::filterIdGenerator++;
189   
190      std::cout<<"CSpatialTransformFilter::apply filter tag = "<<tag<<std::endl;
191   
192
193      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>;
194
195      (*CWorkflowGraph::mapFilters_ptr)[this->filterID] = "Spatial Transform Filter";
196
197      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >;
198
199      StdString str = data[0]->fieldID + " ts=" + to_string(data[0]->timestamp);
200      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(data[0]->src_filterID);
201      (*CWorkflowGraph::mapFieldToFilters_ptr)[str].push_back(this->filterID);
202    }
203
204
205    CDataPacketPtr packet(new CDataPacket);
206    packet->date = data[0]->date;
207    packet->timestamp = data[0]->timestamp;
208    packet->status = data[0]->status;
209
210    if (packet->status == CDataPacket::NO_ERROR)
211    {
212      if (1 < data.size())  // Dynamical transformations
213      {
214        std::vector<CArray<double,1>* > dataAuxInputs(data.size()-1);
215        for (size_t idx = 0; idx < dataAuxInputs.size(); ++idx) dataAuxInputs[idx] = &(data[idx+1]->data);
216        gridTransformation->computeAll(dataAuxInputs, packet->timestamp);
217      }
218      packet->data.resize(gridTransformation->getGridDestination()->storeIndex_client.numElements());
219      if (0 != packet->data.numElements())
220        (packet->data)(0) = defaultValue;
221      apply(data[0]->data, packet->data);
222    }
223
224    if(tag) packet->src_filterID=this->filterID;
225    packet->fieldID=fieldID;
226
227    return packet;
228  }
229
230  void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)
231  {
232    CTimer::get("CSpatialTransformFilterEngine::apply").resume(); 
233   
234    CContextClient* client = CContext::getCurrent()->client;
235    int rank;
236    MPI_Comm_rank (client->intraComm, &rank);
237
238    // Get default value for output data
239    bool ignoreMissingValue = false; 
240    double defaultValue = std::numeric_limits<double>::quiet_NaN();
241    if (0 != dataDest.numElements()) ignoreMissingValue = NumTraits<double>::isNan(dataDest(0));
242
243    const std::list<CGridTransformation::SendingIndexGridSourceMap>& listLocalIndexSend = gridTransformation->getLocalIndexToSendFromGridSource();
244    const std::list<CGridTransformation::RecvIndexGridDestinationMap>& listLocalIndexToReceive = gridTransformation->getLocalIndexToReceiveOnGridDest();
245    const std::list<size_t>& listNbLocalIndexToReceive = gridTransformation->getNbLocalIndexToReceiveOnGridDest();
246    const std::vector<CGenericAlgorithmTransformation*>& listAlgos = gridTransformation->getAlgos();
247
248    CArray<double,1> dataCurrentDest(dataSrc.copy());
249
250    std::list<CGridTransformation::SendingIndexGridSourceMap>::const_iterator itListSend  = listLocalIndexSend.begin(),
251                                                                              iteListSend = listLocalIndexSend.end();
252    std::list<CGridTransformation::RecvIndexGridDestinationMap>::const_iterator itListRecv = listLocalIndexToReceive.begin();
253    std::list<size_t>::const_iterator itNbListRecv = listNbLocalIndexToReceive.begin();
254    std::vector<CGenericAlgorithmTransformation*>::const_iterator itAlgo = listAlgos.begin();
255
256    for (; itListSend != iteListSend; ++itListSend, ++itListRecv, ++itNbListRecv, ++itAlgo)
257    {
258      CArray<double,1> dataCurrentSrc(dataCurrentDest);
259      const CGridTransformation::SendingIndexGridSourceMap& localIndexToSend = *itListSend;
260
261      // Sending data from field sources to do transformations
262      std::map<int, CArray<int,1> >::const_iterator itbSend = localIndexToSend.begin(), itSend,
263                                                    iteSend = localIndexToSend.end();
264      int idxSendBuff = 0;
265      std::vector<double*> sendBuff(localIndexToSend.size());
266      double* sendBuffRank;
267      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
268      {
269        int destRank = itSend->first;
270        if (0 != itSend->second.numElements())
271        {
272          if (rank != itSend->first)
273            sendBuff[idxSendBuff] = new double[itSend->second.numElements()];
274          else
275            sendBuffRank = new double[itSend->second.numElements()];
276        }
277      }
278
279      idxSendBuff = 0;
280      std::vector<MPI_Request> sendRecvRequest(localIndexToSend.size() + itListRecv->size());
281      int position = 0;
282      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
283      {
284        int destRank = itSend->first;
285        const CArray<int,1>& localIndex_p = itSend->second;
286        int countSize = localIndex_p.numElements();
287        if (destRank != rank)
288        {
289          for (int idx = 0; idx < countSize; ++idx)
290          {
291            sendBuff[idxSendBuff][idx] = dataCurrentSrc(localIndex_p(idx));
292          } 
293          MPI_Isend(sendBuff[idxSendBuff], countSize, MPI_DOUBLE, destRank, 12, client->intraComm, &sendRecvRequest[position++]);
294         
295        }
296        else
297        {
298          for (int idx = 0; idx < countSize; ++idx)
299          {
300            sendBuffRank[idx] = dataCurrentSrc(localIndex_p(idx));
301          }
302        }
303      }
304
305      // Receiving data on destination fields
306      const CGridTransformation::RecvIndexGridDestinationMap& localIndexToReceive = *itListRecv;
307      CGridTransformation::RecvIndexGridDestinationMap::const_iterator itbRecv = localIndexToReceive.begin(), itRecv,
308                                                                       iteRecv = localIndexToReceive.end();
309      int recvBuffSize = 0;
310      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
311      {
312        if (itRecv->first != rank )
313          recvBuffSize += itRecv->second.size();
314      }
315      //(recvBuffSize < itRecv->second.size()) ? itRecv->second.size() : recvBuffSize;
316      double* recvBuff;
317
318      if (0 != recvBuffSize) recvBuff = new double[recvBuffSize];
319      int currentBuff = 0;
320      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
321      {
322        int srcRank = itRecv->first;
323        if (srcRank != rank)
324        {
325          int countSize = itRecv->second.size();
326          MPI_Irecv(recvBuff + currentBuff, countSize, MPI_DOUBLE, srcRank, 12, client->intraComm, &sendRecvRequest[position++]);
327          currentBuff += countSize;
328        }
329      }
330      std::vector<MPI_Status> status(sendRecvRequest.size());
331      MPI_Waitall(position, &sendRecvRequest[0], &status[0]);
332
333
334
335      dataCurrentDest.resize(*itNbListRecv);
336      dataCurrentDest = 0.0;
337
338      std::vector<bool> localInitFlag(dataCurrentDest.numElements(), true);
339      currentBuff = 0;
340      bool firstPass=true; 
341      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
342      {
343        const std::vector<std::pair<int,double> >& localIndex_p = itRecv->second;
344        int srcRank = itRecv->first;
345        if (srcRank != rank)
346        {
347          int countSize = itRecv->second.size();
348          (*itAlgo)->apply(localIndex_p,
349                           recvBuff+currentBuff,
350                           dataCurrentDest,
351                           localInitFlag,
352                           ignoreMissingValue,firstPass);
353          currentBuff += countSize;
354        }
355        else
356        {
357          (*itAlgo)->apply(localIndex_p,
358                           sendBuffRank,
359                           dataCurrentDest,
360                           localInitFlag,
361                           ignoreMissingValue,firstPass);
362        }
363
364        firstPass=false ;
365      }
366
367      (*itAlgo)->updateData(dataCurrentDest);
368
369      idxSendBuff = 0;
370      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
371      {
372        if (0 != itSend->second.numElements())
373        {
374          if (rank != itSend->first)
375            delete [] sendBuff[idxSendBuff];
376          else
377            delete [] sendBuffRank;
378        }
379      }
380      if (0 != recvBuffSize) delete [] recvBuff;
381    }
382    if (dataCurrentDest.numElements() != dataDest.numElements())
383    ERROR("CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)",
384          "Incoherent between the received size and expected size. " << std::endl
385          << "Expected size: " << dataDest.numElements() << std::endl
386          << "Received size: " << dataCurrentDest.numElements());
387
388    dataDest = dataCurrentDest;
389
390    CTimer::get("CSpatialTransformFilterEngine::apply").suspend() ;
391  }
392} // namespace xios
Note: See TracBrowser for help on using the repository browser.