source: XIOS/dev/dev_trunk_omp/src/filter/spatial_transform_filter.cpp @ 1685

Last change on this file since 1685 was 1685, checked in by yushan, 2 years ago

dev for graph

File size: 16.7 KB
Line 
1#include "mpi.hpp"
2#include "spatial_transform_filter.hpp"
3#include "grid_transformation.hpp"
4#include "context.hpp"
5#include "context_client.hpp"
6#include "timer.hpp"
7#ifdef _usingEP
8using namespace ep_lib;
9#endif
10#include "workflow_graph.hpp"
11#include "file.hpp"
12namespace xios
13{
14  CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine,
15                                                   double outputValue, size_t inputSlotsCount)
16    : CFilter(gc, inputSlotsCount, engine), outputDefaultValue(outputValue)
17  { /* Nothing to do */ }
18
19  std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >
20  CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double missingValue)
21  {
22    if (!srcGrid || !destGrid)
23      ERROR("std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >"
24            "buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid)",
25            "Impossible to build the filter graph if either the source or the destination grid are null.");
26
27    std::shared_ptr<CSpatialTransformFilter> firstFilter, lastFilter;
28    // Note that this loop goes from the last transformation to the first transformation
29    do
30    {
31      CGridTransformation* gridTransformation = destGrid->getTransformations();
32      CSpatialTransformFilterEngine* engine = CSpatialTransformFilterEngine::get(destGrid->getTransformations());
33      const std::vector<StdString>& auxInputs = gridTransformation->getAuxInputs();
34      size_t inputCount = 1 + (auxInputs.empty() ? 0 : auxInputs.size());
35      double defaultValue  = (hasMissingValue) ? std::numeric_limits<double>::quiet_NaN() : 0.0;
36
37
38      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
39      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
40
41      bool isSpatialTemporal=false ;
42      for (it=algoList.begin();it!=algoList.end();++it)  if (it->second.first == TRANS_TEMPORAL_SPLITTING) isSpatialTemporal=true ;
43
44      std::shared_ptr<CSpatialTransformFilter> filter ;
45      if( isSpatialTemporal)
46        filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTemporalFilter(gc, engine, gridTransformation, defaultValue, inputCount));
47      else
48        filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount));
49
50     
51      if (!lastFilter)
52        lastFilter = filter;
53      else
54      {
55        filter->connectOutput(firstFilter, 0);
56      }
57
58      firstFilter = filter;
59      for (size_t idx = 0; idx < auxInputs.size(); ++idx)
60      {
61        CField* fieldAuxInput = CField::get(auxInputs[idx]);
62        fieldAuxInput->buildFilterGraph(gc, false);
63        fieldAuxInput->getInstantDataFilter()->connectOutput(firstFilter,idx+1);
64      }
65
66      destGrid = gridTransformation->getGridSource();
67    }
68    while (destGrid != srcGrid);
69
70    return std::make_pair(firstFilter, lastFilter);
71  }
72
73  void CSpatialTransformFilter::onInputReady(std::vector<CDataPacketPtr> data)
74  {
75
76    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
77    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->start_graph, this->end_graph, this->field);
78    if (outputPacket)
79    {
80      onOutputReady(outputPacket);
81    } 
82  }
83
84  CSpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine,
85                                                  CGridTransformation* gridTransformation, double outputValue,
86                                                  size_t inputSlotsCount)
87    : CSpatialTransformFilter(gc, engine, outputValue, inputSlotsCount), record(0)
88  {
89      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
90      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
91
92      int pos ;
93      for (it=algoList.begin();it!=algoList.end();++it) 
94        if (it->second.first == TRANS_TEMPORAL_SPLITTING)
95        {
96          pos=it->first ;
97          if (pos < algoList.size()-1)
98            ERROR("SpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount))",
99                  "temporal splitting operation must be the last of whole transformation on same grid") ;
100        }
101         
102      CGrid* grid=gridTransformation->getGridDestination() ;
103
104      CAxis* axis = grid->getAxis(gridTransformation->getElementPositionInGridDst2AxisPosition().find(pos)->second) ;
105
106      nrecords = axis->index.numElements() ;
107  }
108
109
110  void CSpatialTemporalFilter::onInputReady(std::vector<CDataPacketPtr> data)
111  {
112
113    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
114    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue, this->tag, this->start_graph, this->end_graph, this->field);
115
116    if (outputPacket)
117    {
118      size_t nelements=outputPacket->data.numElements() ;
119      if (!tmpData.numElements())
120      {
121        tmpData.resize(nelements);
122        tmpData=outputDefaultValue ;
123      }
124
125      nelements/=nrecords ;
126      size_t offset=nelements*record ;
127      for(size_t i=0;i<nelements;++i)  tmpData(i+offset) = outputPacket->data(i) ;
128   
129      record ++ ;
130      if (record==nrecords)
131      {
132        record=0 ;
133        CDataPacketPtr packet = CDataPacketPtr(new CDataPacket);
134        packet->date = data[0]->date;
135        packet->timestamp = data[0]->timestamp;
136        packet->status = data[0]->status;
137        packet->data.resize(tmpData.numElements());
138        packet->data = tmpData;
139        packet->field = this->field;
140        onOutputReady(packet);
141        tmpData.resize(0) ;
142      }
143    }
144  }
145
146
147  CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)
148    : gridTransformation(gridTransformation)
149  {
150    if (!gridTransformation)
151      ERROR("CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)",
152            "Impossible to construct a spatial transform filter engine without a valid grid transformation.");
153  }
154
155  std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> > *CSpatialTransformFilterEngine::engines_ptr = 0;
156
157  CSpatialTransformFilterEngine* CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)
158  {
159    if (!gridTransformation)
160      ERROR("CSpatialTransformFilterEngine& CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)",
161            "Impossible to get the requested engine, the grid transformation is invalid.");
162   
163    if(engines_ptr == NULL) engines_ptr = new std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >;
164
165
166    std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >::iterator it = engines_ptr->find(gridTransformation);
167    if (it == engines_ptr->end())
168    {
169      std::shared_ptr<CSpatialTransformFilterEngine> engine(new CSpatialTransformFilterEngine(gridTransformation));
170      it = engines_ptr->insert(std::make_pair(gridTransformation, engine)).first;
171    }
172
173    return it->second.get();
174  }
175
176  CDataPacketPtr CSpatialTransformFilterEngine::apply(std::vector<CDataPacketPtr> data)
177  {
178    /* Nothing to do */
179  }
180
181  bool CSpatialTransformFilterEngine::buildGraph(std::vector<CDataPacketPtr> data, int tag, Time start_graph, Time end_graph, CField *field)
182  {
183    bool building_graph = tag ? data[0]->timestamp >= start_graph && data[0]->timestamp <= end_graph : false;
184    if(building_graph)
185    {
186      this->filterID = InvalidableObject::filterIdGenerator++;
187      int edgeID = InvalidableObject::edgeIdGenerator++;   
188
189      CWorkflowGraph::allocNodeEdge();
190
191      CWorkflowGraph::addNode(this->filterID, "Spatial Transform Filter", 4, 1, 1, data[0]);
192      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1;
193      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = field->record4graphXiosAttributes();
194      if(field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +field->file->record4graphXiosAttributes();
195
196
197      if(CWorkflowGraph::build_begin)
198      {
199        CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]);
200
201        (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0;
202      }
203      else CWorkflowGraph::build_begin = true;
204    }
205
206    return building_graph;
207  }
208
209  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue, int tag, Time start_graph, Time end_graph, CField *field)
210  {
211   
212    bool BG = buildGraph(data, tag, start_graph, end_graph, field);
213
214    CDataPacketPtr packet(new CDataPacket);
215    packet->date = data[0]->date;
216    packet->timestamp = data[0]->timestamp;
217    packet->status = data[0]->status;
218   
219
220    if (packet->status == CDataPacket::NO_ERROR)
221    {
222      if (1 < data.size())  // Dynamical transformations
223      {
224        std::vector<CArray<double,1>* > dataAuxInputs(data.size()-1);
225        for (size_t idx = 0; idx < dataAuxInputs.size(); ++idx) dataAuxInputs[idx] = &(data[idx+1]->data);
226        gridTransformation->computeAll(dataAuxInputs, packet->timestamp);
227      }
228      packet->data.resize(gridTransformation->getGridDestination()->storeIndex_client.numElements());
229      if (0 != packet->data.numElements())
230        (packet->data)(0) = defaultValue;
231      if(BG) apply(data[0]->data, packet->data, this->filterID);
232      else apply(data[0]->data, packet->data);
233    }
234
235    if(BG) packet->src_filterID=this->filterID;
236    if(BG) packet->distance=data[0]->distance+1;
237    packet->field = field;
238
239    return packet;
240  }
241
242  void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest, int filterID)
243  {
244    CTimer::get("CSpatialTransformFilterEngine::apply").resume(); 
245   
246    CContextClient* client = CContext::getCurrent()->client;
247    int rank;
248    MPI_Comm_rank (client->intraComm, &rank);
249
250    // Get default value for output data
251    bool ignoreMissingValue = false; 
252    double defaultValue = std::numeric_limits<double>::quiet_NaN();
253    if (0 != dataDest.numElements()) ignoreMissingValue = NumTraits<double>::isNan(dataDest(0));
254
255    const std::list<CGridTransformation::SendingIndexGridSourceMap>& listLocalIndexSend = gridTransformation->getLocalIndexToSendFromGridSource();
256    const std::list<CGridTransformation::RecvIndexGridDestinationMap>& listLocalIndexToReceive = gridTransformation->getLocalIndexToReceiveOnGridDest();
257    const std::list<size_t>& listNbLocalIndexToReceive = gridTransformation->getNbLocalIndexToReceiveOnGridDest();
258    const std::vector<CGenericAlgorithmTransformation*>& listAlgos = gridTransformation->getAlgos();
259
260    CArray<double,1> dataCurrentDest(dataSrc.copy());
261
262    std::list<CGridTransformation::SendingIndexGridSourceMap>::const_iterator itListSend  = listLocalIndexSend.begin(),
263                                                                              iteListSend = listLocalIndexSend.end();
264    std::list<CGridTransformation::RecvIndexGridDestinationMap>::const_iterator itListRecv = listLocalIndexToReceive.begin();
265    std::list<size_t>::const_iterator itNbListRecv = listNbLocalIndexToReceive.begin();
266    std::vector<CGenericAlgorithmTransformation*>::const_iterator itAlgo = listAlgos.begin();
267
268    for (; itListSend != iteListSend; ++itListSend, ++itListRecv, ++itNbListRecv, ++itAlgo)
269    {
270      CArray<double,1> dataCurrentSrc(dataCurrentDest);
271      const CGridTransformation::SendingIndexGridSourceMap& localIndexToSend = *itListSend;
272
273      // Sending data from field sources to do transformations
274      std::map<int, CArray<int,1> >::const_iterator itbSend = localIndexToSend.begin(), itSend,
275                                                    iteSend = localIndexToSend.end();
276      int idxSendBuff = 0;
277      std::vector<double*> sendBuff(localIndexToSend.size());
278      double* sendBuffRank;
279      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
280      {
281        int destRank = itSend->first;
282        if (0 != itSend->second.numElements())
283        {
284          if (rank != itSend->first)
285            sendBuff[idxSendBuff] = new double[itSend->second.numElements()];
286          else
287            sendBuffRank = new double[itSend->second.numElements()];
288        }
289      }
290
291      idxSendBuff = 0;
292      std::vector<MPI_Request> sendRecvRequest(localIndexToSend.size() + itListRecv->size());
293      int position = 0;
294      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
295      {
296        int destRank = itSend->first;
297        const CArray<int,1>& localIndex_p = itSend->second;
298        int countSize = localIndex_p.numElements();
299        if (destRank != rank)
300        {
301          for (int idx = 0; idx < countSize; ++idx)
302          {
303            sendBuff[idxSendBuff][idx] = dataCurrentSrc(localIndex_p(idx));
304          } 
305          MPI_Isend(sendBuff[idxSendBuff], countSize, MPI_DOUBLE, destRank, 12, client->intraComm, &sendRecvRequest[position++]);
306         
307        }
308        else
309        {
310          for (int idx = 0; idx < countSize; ++idx)
311          {
312            sendBuffRank[idx] = dataCurrentSrc(localIndex_p(idx));
313          }
314        }
315      }
316
317      // Receiving data on destination fields
318      const CGridTransformation::RecvIndexGridDestinationMap& localIndexToReceive = *itListRecv;
319      CGridTransformation::RecvIndexGridDestinationMap::const_iterator itbRecv = localIndexToReceive.begin(), itRecv,
320                                                                       iteRecv = localIndexToReceive.end();
321      int recvBuffSize = 0;
322      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
323      {
324        if (itRecv->first != rank )
325          recvBuffSize += itRecv->second.size();
326      }
327      //(recvBuffSize < itRecv->second.size()) ? itRecv->second.size() : recvBuffSize;
328      double* recvBuff;
329
330      if (0 != recvBuffSize) recvBuff = new double[recvBuffSize];
331      int currentBuff = 0;
332      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
333      {
334        int srcRank = itRecv->first;
335        if (srcRank != rank)
336        {
337          int countSize = itRecv->second.size();
338          MPI_Irecv(recvBuff + currentBuff, countSize, MPI_DOUBLE, srcRank, 12, client->intraComm, &sendRecvRequest[position++]);
339          currentBuff += countSize;
340        }
341      }
342      std::vector<MPI_Status> status(sendRecvRequest.size());
343      MPI_Waitall(position, &sendRecvRequest[0], &status[0]);
344
345
346
347      dataCurrentDest.resize(*itNbListRecv);
348      dataCurrentDest = 0.0;
349
350      std::vector<bool> localInitFlag(dataCurrentDest.numElements(), true);
351      currentBuff = 0;
352      bool firstPass=true; 
353      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
354      {
355        const std::vector<std::pair<int,double> >& localIndex_p = itRecv->second;
356        int srcRank = itRecv->first;
357
358        if(filterID >=0) // building_graph
359        {
360           (*CWorkflowGraph::mapFilters_ptr_with_info)[filterID].filter_name = (*itAlgo)->getName();
361        } 
362        if (srcRank != rank)
363        {
364          int countSize = itRecv->second.size();
365          (*itAlgo)->apply(localIndex_p,
366                           recvBuff+currentBuff,
367                           dataCurrentDest,
368                           localInitFlag,
369                           ignoreMissingValue,firstPass);
370          currentBuff += countSize;
371        }
372        else
373        {
374          (*itAlgo)->apply(localIndex_p,
375                           sendBuffRank,
376                           dataCurrentDest,
377                           localInitFlag,
378                           ignoreMissingValue,firstPass);
379        }
380
381        firstPass=false ;
382      }
383
384      (*itAlgo)->updateData(dataCurrentDest);
385
386      idxSendBuff = 0;
387      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
388      {
389        if (0 != itSend->second.numElements())
390        {
391          if (rank != itSend->first)
392            delete [] sendBuff[idxSendBuff];
393          else
394            delete [] sendBuffRank;
395        }
396      }
397      if (0 != recvBuffSize) delete [] recvBuff;
398    }
399    if (dataCurrentDest.numElements() != dataDest.numElements())
400    ERROR("CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)",
401          "Incoherent between the received size and expected size. " << std::endl
402          << "Expected size: " << dataDest.numElements() << std::endl
403          << "Received size: " << dataCurrentDest.numElements());
404
405    dataDest = dataCurrentDest;
406
407    CTimer::get("CSpatialTransformFilterEngine::apply").suspend() ;
408  }
409} // namespace xios
Note: See TracBrowser for help on using the repository browser.