source: XIOS/dev/dev_olga/src/filter/spatial_transform_filter.cpp @ 1654

Last change on this file since 1654 was 1653, checked in by oabramkina, 5 years ago

Developments for visualization of XIOS workflow.

Branch is spawned from trunk r1649.

Boost library is used for producing Graphviz DOT files. Current results: a DOT file representing a static workflow. For a complete proof of concept, DOT files for each timestamp should be generated. The necessary information has been collected by XIOS, it only requires rearranging the information for graphing (changes in classes CWorkflowGraph and CGraphviz).

File size: 15.3 KB
Line 
1#include "spatial_transform_filter.hpp"
2#include "grid_transformation.hpp"
3#include "context.hpp"
4#include "context_client.hpp"
5#include "timer.hpp"
6#include "workflow_graph.hpp"
7
8namespace xios
9{
10  CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine,
11                                                   double outputValue, size_t inputSlotsCount, bool buildWorkflowGraph /*= false*/)
12    : CFilter(gc, inputSlotsCount, engine, buildWorkflowGraph), outputDefaultValue(outputValue)
13  { /* Nothing to do */ }
14
15  std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >
16  CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double missingValue,
17                                            bool buildWorkflowGraph)
18  {
19    if (!srcGrid || !destGrid)
20      ERROR("std::pair<std::shared_ptr<CSpatialTransformFilter>, std::shared_ptr<CSpatialTransformFilter> >"
21            "buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid)",
22            "Impossible to build the filter graph if either the source or the destination grid are null.");
23
24    std::shared_ptr<CSpatialTransformFilter> firstFilter, lastFilter;
25    // Note that this loop goes from the last transformation to the first transformation
26    do
27    {
28      CGridTransformation* gridTransformation = destGrid->getTransformations();
29      CSpatialTransformFilterEngine* engine = CSpatialTransformFilterEngine::get(destGrid->getTransformations());
30      const std::vector<StdString>& auxInputs = gridTransformation->getAuxInputs();
31      size_t inputCount = 1 + (auxInputs.empty() ? 0 : auxInputs.size());
32      double defaultValue  = (hasMissingValue) ? std::numeric_limits<double>::quiet_NaN() : 0.0;
33
34      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
35      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
36
37      bool isSpatialTemporal=false ;
38      for (it=algoList.begin();it!=algoList.end();++it)  if (it->second.first == TRANS_TEMPORAL_SPLITTING) isSpatialTemporal=true ;
39
40      std::shared_ptr<CSpatialTransformFilter> filter ;
41      if( isSpatialTemporal)
42        filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTemporalFilter(gc, engine, gridTransformation, defaultValue, inputCount, buildWorkflowGraph));
43      else
44        filter = std::shared_ptr<CSpatialTransformFilter>(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount, buildWorkflowGraph));
45
46      if (!lastFilter)
47        lastFilter = filter;
48      else
49      {
50        filter->connectOutput(firstFilter, 0);
51        if (buildWorkflowGraph)
52        {
53          int filterOut = (std::static_pointer_cast<COutputPin>(filter))->getFilterId();
54          int filterIn = (std::static_pointer_cast<COutputPin>(firstFilter))->getFilterId();
55          // PASS field's id here
56          CWorkflowGraph::mapFieldToFilters["XXX"].push_back(filterOut);
57          CWorkflowGraph::mapFieldToFilters["XXX"].push_back(filterIn);
58          CWorkflowGraph::mapFilters[filterOut] = "Spatial transform filter";
59          CWorkflowGraph::mapFilters[filterIn] = "Spatial transform filter";
60        }
61      }
62
63      firstFilter = filter;
64      for (size_t idx = 0; idx < auxInputs.size(); ++idx)
65      {
66        CField* fieldAuxInput = CField::get(auxInputs[idx]);
67        fieldAuxInput->buildFilterGraph(gc, false);
68        fieldAuxInput->getInstantDataFilter()->connectOutput(firstFilter,idx+1);
69      }
70
71      destGrid = gridTransformation->getGridSource();
72    }
73    while (destGrid != srcGrid);
74
75    return std::make_pair(firstFilter, lastFilter);
76  }
77
78  void CSpatialTransformFilter::onInputReady(std::vector<CDataPacketPtr> data)
79  {
80    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
81    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue);
82    if (outputPacket)
83      onOutputReady(outputPacket);
84  }
85
86  CSpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine,
87                                                  CGridTransformation* gridTransformation, double outputValue,
88                                                  size_t inputSlotsCount, bool buildWorkflowGraph)
89    : CSpatialTransformFilter(gc, engine, outputValue, inputSlotsCount, buildWorkflowGraph), record(0)
90  {
91      const CGridTransformationSelector::ListAlgoType& algoList = gridTransformation->getAlgoList() ;
92      CGridTransformationSelector::ListAlgoType::const_iterator it  ;
93
94      int pos ;
95      for (it=algoList.begin();it!=algoList.end();++it) 
96        if (it->second.first == TRANS_TEMPORAL_SPLITTING)
97        {
98          pos=it->first ;
99          if (pos < algoList.size()-1)
100            ERROR("SpatialTemporalFilter::CSpatialTemporalFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, CGridTransformation* gridTransformation, double outputValue, size_t inputSlotsCount))",
101                  "temporal splitting operation must be the last of whole transformation on same grid") ;
102        }
103         
104      CGrid* grid=gridTransformation->getGridDestination() ;
105
106      CAxis* axis = grid->getAxis(gridTransformation->getElementPositionInGridDst2AxisPosition().find(pos)->second) ;
107
108      nrecords = axis->index.numElements() ;
109  }
110
111
112  void CSpatialTemporalFilter::onInputReady(std::vector<CDataPacketPtr> data)
113  {
114    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
115    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue);
116
117    if (outputPacket)
118    {
119      size_t nelements=outputPacket->data.numElements() ;
120      if (!tmpData.numElements())
121      {
122        tmpData.resize(nelements);
123        tmpData=outputDefaultValue ;
124      }
125
126      nelements/=nrecords ;
127      size_t offset=nelements*record ;
128      for(size_t i=0;i<nelements;++i)  tmpData(i+offset) = outputPacket->data(i) ;
129   
130      record ++ ;
131      if (record==nrecords)
132      {
133        record=0 ;
134        CDataPacketPtr packet = CDataPacketPtr(new CDataPacket);
135        packet->date = data[0]->date;
136        packet->timestamp = data[0]->timestamp;
137        packet->status = data[0]->status;
138        packet->data.resize(tmpData.numElements());
139        packet->data = tmpData;
140        onOutputReady(packet);
141        tmpData.resize(0) ;
142      }
143    }
144  }
145
146
147  CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)
148    : gridTransformation(gridTransformation)
149  {
150    if (!gridTransformation)
151      ERROR("CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)",
152            "Impossible to construct a spatial transform filter engine without a valid grid transformation.");
153  }
154
155  std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> > CSpatialTransformFilterEngine::engines;
156
157  CSpatialTransformFilterEngine* CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)
158  {
159    if (!gridTransformation)
160      ERROR("CSpatialTransformFilterEngine& CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)",
161            "Impossible to get the requested engine, the grid transformation is invalid.");
162
163    std::map<CGridTransformation*, std::shared_ptr<CSpatialTransformFilterEngine> >::iterator it = engines.find(gridTransformation);
164    if (it == engines.end())
165    {
166      std::shared_ptr<CSpatialTransformFilterEngine> engine(new CSpatialTransformFilterEngine(gridTransformation));
167      it = engines.insert(std::make_pair(gridTransformation, engine)).first;
168    }
169
170    return it->second.get();
171  }
172
173  CDataPacketPtr CSpatialTransformFilterEngine::apply(std::vector<CDataPacketPtr> data)
174  {
175    /* Nothing to do */
176  }
177
178  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue)
179  {
180    CDataPacketPtr packet(new CDataPacket);
181    packet->date = data[0]->date;
182    packet->timestamp = data[0]->timestamp;
183    packet->status = data[0]->status;
184
185    if (packet->status == CDataPacket::NO_ERROR)
186    {
187      if (1 < data.size())  // Dynamical transformations
188      {
189        std::vector<CArray<double,1>* > dataAuxInputs(data.size()-1);
190        for (size_t idx = 0; idx < dataAuxInputs.size(); ++idx) dataAuxInputs[idx] = &(data[idx+1]->data);
191        gridTransformation->computeAll(dataAuxInputs, packet->timestamp);
192      }
193      packet->data.resize(gridTransformation->getGridDestination()->storeIndex_client.numElements());
194      if (0 != packet->data.numElements())
195        (packet->data)(0) = defaultValue;
196      apply(data[0]->data, packet->data);
197    }
198
199    return packet;
200  }
201
202  void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)
203  {
204    CTimer::get("CSpatialTransformFilterEngine::apply").resume(); 
205   
206    CContextClient* client = CContext::getCurrent()->client;
207    int rank;
208    MPI_Comm_rank (client->intraComm, &rank);
209
210    // Get default value for output data
211    bool ignoreMissingValue = false; 
212    double defaultValue = std::numeric_limits<double>::quiet_NaN();
213    if (0 != dataDest.numElements()) ignoreMissingValue = NumTraits<double>::isNan(dataDest(0));
214
215    const std::list<CGridTransformation::SendingIndexGridSourceMap>& listLocalIndexSend = gridTransformation->getLocalIndexToSendFromGridSource();
216    const std::list<CGridTransformation::RecvIndexGridDestinationMap>& listLocalIndexToReceive = gridTransformation->getLocalIndexToReceiveOnGridDest();
217    const std::list<size_t>& listNbLocalIndexToReceive = gridTransformation->getNbLocalIndexToReceiveOnGridDest();
218    const std::vector<CGenericAlgorithmTransformation*>& listAlgos = gridTransformation->getAlgos();
219
220    CArray<double,1> dataCurrentDest(dataSrc.copy());
221
222    std::list<CGridTransformation::SendingIndexGridSourceMap>::const_iterator itListSend  = listLocalIndexSend.begin(),
223                                                                              iteListSend = listLocalIndexSend.end();
224    std::list<CGridTransformation::RecvIndexGridDestinationMap>::const_iterator itListRecv = listLocalIndexToReceive.begin();
225    std::list<size_t>::const_iterator itNbListRecv = listNbLocalIndexToReceive.begin();
226    std::vector<CGenericAlgorithmTransformation*>::const_iterator itAlgo = listAlgos.begin();
227
228    for (; itListSend != iteListSend; ++itListSend, ++itListRecv, ++itNbListRecv, ++itAlgo)
229    {
230      CArray<double,1> dataCurrentSrc(dataCurrentDest);
231      const CGridTransformation::SendingIndexGridSourceMap& localIndexToSend = *itListSend;
232
233      // Sending data from field sources to do transformations
234      std::map<int, CArray<int,1> >::const_iterator itbSend = localIndexToSend.begin(), itSend,
235                                                    iteSend = localIndexToSend.end();
236      int idxSendBuff = 0;
237      std::vector<double*> sendBuff(localIndexToSend.size());
238      double* sendBuffRank;
239      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
240      {
241        int destRank = itSend->first;
242        if (0 != itSend->second.numElements())
243        {
244          if (rank != itSend->first)
245            sendBuff[idxSendBuff] = new double[itSend->second.numElements()];
246          else
247            sendBuffRank = new double[itSend->second.numElements()];
248        }
249      }
250
251      idxSendBuff = 0;
252      std::vector<MPI_Request> sendRecvRequest;
253      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
254      {
255        int destRank = itSend->first;
256        const CArray<int,1>& localIndex_p = itSend->second;
257        int countSize = localIndex_p.numElements();
258        if (destRank != rank)
259        {
260          for (int idx = 0; idx < countSize; ++idx)
261          {
262            sendBuff[idxSendBuff][idx] = dataCurrentSrc(localIndex_p(idx));
263          }
264          sendRecvRequest.push_back(MPI_Request());
265          MPI_Isend(sendBuff[idxSendBuff], countSize, MPI_DOUBLE, destRank, 12, client->intraComm, &sendRecvRequest.back());
266        }
267        else
268        {
269          for (int idx = 0; idx < countSize; ++idx)
270          {
271            sendBuffRank[idx] = dataCurrentSrc(localIndex_p(idx));
272          }
273        }
274      }
275
276      // Receiving data on destination fields
277      const CGridTransformation::RecvIndexGridDestinationMap& localIndexToReceive = *itListRecv;
278      CGridTransformation::RecvIndexGridDestinationMap::const_iterator itbRecv = localIndexToReceive.begin(), itRecv,
279                                                                       iteRecv = localIndexToReceive.end();
280      int recvBuffSize = 0;
281      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
282      {
283        if (itRecv->first != rank )
284          recvBuffSize += itRecv->second.size();
285      }
286      //(recvBuffSize < itRecv->second.size()) ? itRecv->second.size() : recvBuffSize;
287      double* recvBuff;
288
289      if (0 != recvBuffSize) recvBuff = new double[recvBuffSize];
290      int currentBuff = 0;
291      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
292      {
293        int srcRank = itRecv->first;
294        if (srcRank != rank)
295        {
296          int countSize = itRecv->second.size();
297          sendRecvRequest.push_back(MPI_Request());
298          MPI_Irecv(recvBuff + currentBuff, countSize, MPI_DOUBLE, srcRank, 12, client->intraComm, &sendRecvRequest.back());
299          currentBuff += countSize;
300        }
301      }
302      std::vector<MPI_Status> status(sendRecvRequest.size());
303      MPI_Waitall(sendRecvRequest.size(), &sendRecvRequest[0], &status[0]);
304
305      dataCurrentDest.resize(*itNbListRecv);
306      dataCurrentDest = 0.0;
307
308      std::vector<bool> localInitFlag(dataCurrentDest.numElements(), true);
309      currentBuff = 0;
310      bool firstPass=true; 
311      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
312      {
313        const std::vector<std::pair<int,double> >& localIndex_p = itRecv->second;
314        int srcRank = itRecv->first;
315        if (srcRank != rank)
316        {
317          int countSize = itRecv->second.size();
318          (*itAlgo)->apply(localIndex_p,
319                           recvBuff+currentBuff,
320                           dataCurrentDest,
321                           localInitFlag,
322                           ignoreMissingValue,firstPass);
323          currentBuff += countSize;
324        }
325        else
326        {
327          (*itAlgo)->apply(localIndex_p,
328                           sendBuffRank,
329                           dataCurrentDest,
330                           localInitFlag,
331                           ignoreMissingValue,firstPass);
332        }
333
334        firstPass=false ;
335      }
336
337      (*itAlgo)->updateData(dataCurrentDest);
338
339      idxSendBuff = 0;
340      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
341      {
342        if (0 != itSend->second.numElements())
343        {
344          if (rank != itSend->first)
345            delete [] sendBuff[idxSendBuff];
346          else
347            delete [] sendBuffRank;
348        }
349      }
350      if (0 != recvBuffSize) delete [] recvBuff;
351    }
352    if (dataCurrentDest.numElements() != dataDest.numElements())
353    ERROR("CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)",
354          "Incoherent between the received size and expected size. " << std::endl
355          << "Expected size: " << dataDest.numElements() << std::endl
356          << "Received size: " << dataCurrentDest.numElements());
357
358    dataDest = dataCurrentDest;
359
360    CTimer::get("CSpatialTransformFilterEngine::apply").suspend() ;
361  }
362} // namespace xios
Note: See TracBrowser for help on using the repository browser.