source: XIOS/trunk/src/filter/spatial_transform_filter.cpp @ 1704

Last change on this file since 1704 was 1704, checked in by yushan, 5 years ago

Introducing the new graph functionality. Attribute build_workflow_graph=.TRUE. is used in the field definition section in the xml file to enable the workflow graph of the field and other fields referecing to it. A more detailed document will be available soon on the graph fuctionality.

File size: 16.3 KB
Line 
1#include "spatial_transform_filter.hpp"
2#include "grid_transformation.hpp"
3#include "context.hpp"
4#include "context_client.hpp"
5#include "timer.hpp"
6#include "workflow_graph.hpp"
7#include "file.hpp"
8
9namespace xios
10{
11  CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, double outputValue, size_t inputSlotsCount)
12    : CFilter(gc, inputSlotsCount, engine), outputDefaultValue(outputValue)
13  { /* Nothing to do */ }
14
15  std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >
16  CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double missingValue)
17  {
18    if (!srcGrid || !destGrid)
19      ERROR("std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >"
20            "buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid)",
21            "Impossible to build the filter graph if either the source or the destination grid are null.");
22
23    std::shared_ptr<CSpatialTransformFilter> firstFilter, lastFilter;
24    // Note that this loop goes from the last transformation to the first transformation
25    do
26    {
27      CGridTransformation* gridTransformation = destGrid->getTransformations();
28      CSpatialTransformFilterEngine* engine = CSpatialTransformFilterEngine::get(destGrid->getTransformations());
29      const std::vector<StdString>& auxInputs = gridTransformation->getAuxInputs();
30      size_t inputCount = 1 + (auxInputs.empty() ? 0 : auxInputs.size());
31      double defaultValue  = (hasMissingValue) ? std::numeric_limits<double>::quiet_NaN() : 0.0;
32
33
34      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
35      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
36
37      bool isSpatialTemporal=false ;
38      for (it=algoList.begin();it!=algoList.end();++it)  if (it->second.first == TRANS_TEMPORAL_SPLITTING) isSpatialTemporal=true ;
39
40      std::shared_ptr<CSpatialTransformFilter> filter ;
41      if( isSpatialTemporal) filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTemporalFilter(gc, engine, gridTransformation, defaultValue, inputCount));
42      else filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount));
43
44     
45      if (!lastFilter)
46        lastFilter = filter;
47      else
48        filter->connectOutput(firstFilter, 0);
49
50      firstFilter = filter;
51      for (size_t idx = 0; idx < auxInputs.size(); ++idx)
52      {
53        CField* fieldAuxInput = CField::get(auxInputs[idx]);
54        fieldAuxInput->buildFilterGraph(gc, false);
55        fieldAuxInput->getInstantDataFilter()->connectOutput(firstFilter,idx+1);
56      }
57
58      destGrid = gridTransformation->getGridSource();
59    }
60    while (destGrid != srcGrid);
61
62    return std::make_pair(firstFilter, lastFilter);
63  }
64
65  void CSpatialTransformFilter::onInputReady(std::vector<CDataPacketPtr> data)
66  {
67    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
68    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->start_graph, this->end_graph, this->field);
69    if (outputPacket)
70      onOutputReady(outputPacket);
71  }
72
73  CSpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount)
74    : CSpatialTransformFilter(gc, engine, outputValue, inputSlotsCount), record(0)
75  {
76      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
77      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
78
79      int pos ;
80      for (it=algoList.begin();it!=algoList.end();++it) 
81        if (it->second.first == TRANS_TEMPORAL_SPLITTING)
82        {
83          pos=it->first ;
84          if (pos < algoList.size()-1)
85            ERROR("SpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount))",
86                  "temporal splitting operation must be the last of whole transformation on same grid") ;
87        }
88         
89      CGrid* grid=gridTransformation->getGridDestination() ;
90
91      CAxis* axis = grid->getAxis(gridTransformation->getElementPositionInGridDst2AxisPosition().find(pos)->second) ;
92
93      nrecords = axis->index.numElements() ;
94  }
95
96
97  void CSpatialTemporalFilter::onInputReady(std::vector<CDataPacketPtr> data)
98  {
99    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
100    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->start_graph, this->end_graph, this->field);
101
102    if (outputPacket)
103    {
104      size_t nelements=outputPacket->data.numElements() ;
105      if (!tmpData.numElements())
106      {
107        tmpData.resize(nelements);
108        tmpData=outputDefaultValue ;
109      }
110
111      nelements/=nrecords ;
112      size_t offset=nelements*record ;
113      for(size_t i=0;i<nelements;++i)  tmpData(i+offset) = outputPacket->data(i) ;
114   
115      record ++ ;
116      if (record==nrecords)
117      {
118        record=0 ;
119        CDataPacketPtr packet = CDataPacketPtr(new CDataPacket);
120        packet->date = data[0]->date;
121        packet->timestamp = data[0]->timestamp;
122        packet->status = data[0]->status;
123        packet->data.resize(tmpData.numElements());
124        packet->data = tmpData;
125        packet->field = this->field;
126        onOutputReady(packet);
127        tmpData.resize(0) ;
128      }
129    }
130  }
131
132
133  CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)
134    : gridTransformation(gridTransformation)
135  {
136    if (!gridTransformation)
137      ERROR("CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)",
138            "Impossible to construct a spatial transform filter engine without a valid grid transformation.");
139  }
140
141  std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> > CSpatialTransformFilterEngine::engines;
142
143  CSpatialTransformFilterEngine* CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)
144  {
145    if (!gridTransformation)
146      ERROR("CSpatialTransformFilterEngine& CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)",
147            "Impossible to get the requested engine, the grid transformation is invalid.");
148
149    std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >::iterator it = engines.find(gridTransformation);
150    if (it == engines.end())
151    {
152      std::shared_ptr<CSpatialTransformFilterEngine> engine(new CSpatialTransformFilterEngine(gridTransformation));
153      it = engines.insert(std::make_pair(gridTransformation, engine)).first;
154    }
155
156    return it->second.get();
157  }
158
159  CDataPacketPtr CSpatialTransformFilterEngine::apply(std::vector<CDataPacketPtr> data)
160  {
161    /* Nothing to do */
162  }
163
164  bool CSpatialTransformFilterEngine::buildGraph(std::vector<CDataPacketPtr> data, int tag, Time start_graph, Time end_graph, CField *field)
165  {
166    bool building_graph = tag ? data[0]->timestamp >= start_graph && data[0]->timestamp <= end_graph : false;
167    if(building_graph)
168    {
169      this->filterID = InvalidableObject::filterIdGenerator++;
170      int edgeID = InvalidableObject::edgeIdGenerator++;   
171
172      CWorkflowGraph::allocNodeEdge();
173
174      CWorkflowGraph::addNode(this->filterID, "Spatial Transform Filter", 4, 1, 1, data[0]);
175      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1;
176      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = field->record4graphXiosAttributes();
177      if(field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +field->file->record4graphXiosAttributes();
178
179
180      if(CWorkflowGraph::build_begin)
181      {
182        CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]);
183
184        (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0;
185      }
186      else CWorkflowGraph::build_begin = true;
187    }
188
189    return building_graph;
190  }
191
192  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue, int tag, Time start_graph, Time end_graph, CField *field)
193  {
194   
195    bool BG = buildGraph(data, tag, start_graph, end_graph, field);
196
197    CDataPacketPtr packet(new CDataPacket);
198    packet->date = data[0]->date;
199    packet->timestamp = data[0]->timestamp;
200    packet->status = data[0]->status;
201
202    if (packet->status == CDataPacket::NO_ERROR)
203    {
204      if (1 < data.size())  // Dynamical transformations
205      {
206        std::vector<CArray<double,1>* > dataAuxInputs(data.size()-1);
207        for (size_t idx = 0; idx < dataAuxInputs.size(); ++idx) dataAuxInputs[idx] = &(data[idx+1]->data);
208        gridTransformation->computeAll(dataAuxInputs, packet->timestamp);
209      }
210      packet->data.resize(gridTransformation->getGridDestination()->storeIndex_client.numElements());
211      if (0 != packet->data.numElements())
212        (packet->data)(0) = defaultValue;
213      if(BG) apply(data[0]->data, packet->data, this->filterID);
214      else apply(data[0]->data, packet->data);
215    }
216
217    if(BG) packet->src_filterID=this->filterID;
218    if(BG) packet->distance=data[0]->distance+1;
219    packet->field = field;
220
221    return packet;
222  }
223
224  void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest, int filterID)
225  {
226    CTimer::get("CSpatialTransformFilterEngine::apply").resume(); 
227   
228    CContextClient* client = CContext::getCurrent()->client;
229    int rank;
230    MPI_Comm_rank (client->intraComm, &rank);
231
232    // Get default value for output data
233    bool ignoreMissingValue = false; 
234    double defaultValue = std::numeric_limits<double>::quiet_NaN();
235    if (0 != dataDest.numElements()) ignoreMissingValue = NumTraits<double>::isNan(dataDest(0));
236
237    const std::list<CGridTransformation::SendingIndexGridSourceMap>& listLocalIndexSend = gridTransformation->getLocalIndexToSendFromGridSource();
238    const std::list<CGridTransformation::RecvIndexGridDestinationMap>& listLocalIndexToReceive = gridTransformation->getLocalIndexToReceiveOnGridDest();
239    const std::list<size_t>& listNbLocalIndexToReceive = gridTransformation->getNbLocalIndexToReceiveOnGridDest();
240    const std::vector<CGenericAlgorithmTransformation*>& listAlgos = gridTransformation->getAlgos();
241
242    CArray<double,1> dataCurrentDest(dataSrc.copy());
243
244    std::list<CGridTransformation::SendingIndexGridSourceMap>::const_iterator itListSend  = listLocalIndexSend.begin(),
245                                                                              iteListSend = listLocalIndexSend.end();
246    std::list<CGridTransformation::RecvIndexGridDestinationMap>::const_iterator itListRecv = listLocalIndexToReceive.begin();
247    std::list<size_t>::const_iterator itNbListRecv = listNbLocalIndexToReceive.begin();
248    std::vector<CGenericAlgorithmTransformation*>::const_iterator itAlgo = listAlgos.begin();
249
250    for (; itListSend != iteListSend; ++itListSend, ++itListRecv, ++itNbListRecv, ++itAlgo)
251    {
252      CArray<double,1> dataCurrentSrc(dataCurrentDest);
253      const CGridTransformation::SendingIndexGridSourceMap& localIndexToSend = *itListSend;
254
255      // Sending data from field sources to do transformations
256      std::map<int, CArray<int,1> >::const_iterator itbSend = localIndexToSend.begin(), itSend,
257                                                    iteSend = localIndexToSend.end();
258      int idxSendBuff = 0;
259      std::vector<double*> sendBuff(localIndexToSend.size());
260      double* sendBuffRank;
261      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
262      {
263        int destRank = itSend->first;
264        if (0 != itSend->second.numElements())
265        {
266          if (rank != itSend->first)
267            sendBuff[idxSendBuff] = new double[itSend->second.numElements()];
268          else
269            sendBuffRank = new double[itSend->second.numElements()];
270        }
271      }
272
273      idxSendBuff = 0;
274      std::vector<MPI_Request> sendRecvRequest;
275      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
276      {
277        int destRank = itSend->first;
278        const CArray<int,1>& localIndex_p = itSend->second;
279        int countSize = localIndex_p.numElements();
280        if (destRank != rank)
281        {
282          for (int idx = 0; idx < countSize; ++idx)
283          {
284            sendBuff[idxSendBuff][idx] = dataCurrentSrc(localIndex_p(idx));
285          }
286          sendRecvRequest.push_back(MPI_Request());
287          MPI_Isend(sendBuff[idxSendBuff], countSize, MPI_DOUBLE, destRank, 12, client->intraComm, &sendRecvRequest.back());
288        }
289        else
290        {
291          for (int idx = 0; idx < countSize; ++idx)
292          {
293            sendBuffRank[idx] = dataCurrentSrc(localIndex_p(idx));
294          }
295        }
296      }
297
298      // Receiving data on destination fields
299      const CGridTransformation::RecvIndexGridDestinationMap& localIndexToReceive = *itListRecv;
300      CGridTransformation::RecvIndexGridDestinationMap::const_iterator itbRecv = localIndexToReceive.begin(), itRecv,
301                                                                       iteRecv = localIndexToReceive.end();
302      int recvBuffSize = 0;
303      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
304      {
305        if (itRecv->first != rank )
306          recvBuffSize += itRecv->second.size();
307      }
308      //(recvBuffSize < itRecv->second.size()) ? itRecv->second.size() : recvBuffSize;
309      double* recvBuff;
310
311      if (0 != recvBuffSize) recvBuff = new double[recvBuffSize];
312      int currentBuff = 0;
313      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
314      {
315        int srcRank = itRecv->first;
316        if (srcRank != rank)
317        {
318          int countSize = itRecv->second.size();
319          sendRecvRequest.push_back(MPI_Request());
320          MPI_Irecv(recvBuff + currentBuff, countSize, MPI_DOUBLE, srcRank, 12, client->intraComm, &sendRecvRequest.back());
321          currentBuff += countSize;
322        }
323      }
324      std::vector<MPI_Status> status(sendRecvRequest.size());
325      MPI_Waitall(sendRecvRequest.size(), &sendRecvRequest[0], &status[0]);
326
327      dataCurrentDest.resize(*itNbListRecv);
328      dataCurrentDest = 0.0;
329
330      std::vector<bool> localInitFlag(dataCurrentDest.numElements(), true);
331      currentBuff = 0;
332      bool firstPass=true; 
333      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
334      {
335        const std::vector<std::pair<int,double> >& localIndex_p = itRecv->second;
336        int srcRank = itRecv->first;
337
338        if(filterID >=0) // building_graph
339        {
340           (*CWorkflowGraph::mapFilters_ptr_with_info)[filterID].filter_name = (*itAlgo)->getName();
341        } 
342        if (srcRank != rank)
343        {
344          int countSize = itRecv->second.size();
345          (*itAlgo)->apply(localIndex_p,
346                           recvBuff+currentBuff,
347                           dataCurrentDest,
348                           localInitFlag,
349                           ignoreMissingValue,firstPass);
350          currentBuff += countSize;
351        }
352        else
353        {
354          (*itAlgo)->apply(localIndex_p,
355                           sendBuffRank,
356                           dataCurrentDest,
357                           localInitFlag,
358                           ignoreMissingValue,firstPass);
359        }
360
361        firstPass=false ;
362      }
363
364      (*itAlgo)->updateData(dataCurrentDest);
365
366      idxSendBuff = 0;
367      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
368      {
369        if (0 != itSend->second.numElements())
370        {
371          if (rank != itSend->first)
372            delete [] sendBuff[idxSendBuff];
373          else
374            delete [] sendBuffRank;
375        }
376      }
377      if (0 != recvBuffSize) delete [] recvBuff;
378    }
379    if (dataCurrentDest.numElements() != dataDest.numElements())
380    ERROR("CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)",
381          "Incoherent between the received size and expected size. " << std::endl
382          << "Expected size: " << dataDest.numElements() << std::endl
383          << "Received size: " << dataCurrentDest.numElements());
384
385    dataDest = dataCurrentDest;
386
387    CTimer::get("CSpatialTransformFilterEngine::apply").suspend() ;
388  }
389} // namespace xios
Note: See TracBrowser for help on using the repository browser.