source: XIOS/dev/branch_yushan_merged/src/filter/spatial_transform_filter.cpp @ 1203

Last change on this file since 1203 was 1203, checked in by yushan, 4 years ago

prep to merge with trunk @1200

File size: 11.0 KB
Line 
1#include "spatial_transform_filter.hpp"
2#include "grid_transformation.hpp"
3#include "context.hpp"
4#include "context_client.hpp"
5
6namespace xios
7{
8  CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, double outputValue, size_t inputSlotsCount)
9    : CFilter(gc, inputSlotsCount, engine), outputDefaultValue(outputValue)
10  { /* Nothing to do */ }
11
12  std::pair<boost::shared_ptr<CSpatialTransformFilter>, boost::shared_ptr<CSpatialTransformFilter> >
13  CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, bool hasMissingValue, double missingValue)
14  {
15    if (!srcGrid || !destGrid)
16      ERROR("std::pair<boost::shared_ptr<CSpatialTransformFilter>, boost::shared_ptr<CSpatialTransformFilter> >"
17            "buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid)",
18            "Impossible to build the filter graph if either the source or the destination grid are null.");
19
20    boost::shared_ptr<CSpatialTransformFilter> firstFilter, lastFilter;
21    // Note that this loop goes from the last transformation to the first transformation
22    do
23    {
24      CGridTransformation* gridTransformation = destGrid->getTransformations();
25      CSpatialTransformFilterEngine* engine = CSpatialTransformFilterEngine::get(destGrid->getTransformations());
26      const std::vector<StdString>& auxInputs = gridTransformation->getAuxInputs();
27      size_t inputCount = 1 + (auxInputs.empty() ? 0 : auxInputs.size());
28      double defaultValue  = (hasMissingValue) ? std::numeric_limits<double>::quiet_NaN() : 0.0;
29      boost::shared_ptr<CSpatialTransformFilter> filter(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount));
30
31      if (!lastFilter)
32        lastFilter = filter;
33      else
34        filter->connectOutput(firstFilter, 0);
35
36      firstFilter = filter;
37      for (size_t idx = 0; idx < auxInputs.size(); ++idx)
38      {
39        CField* fieldAuxInput = CField::get(auxInputs[idx]);
40        fieldAuxInput->buildFilterGraph(gc, false);
41        fieldAuxInput->getInstantDataFilter()->connectOutput(firstFilter,idx+1);
42      }
43
44      destGrid = gridTransformation->getGridSource();
45    }
46    while (destGrid != srcGrid);
47
48    return std::make_pair(firstFilter, lastFilter);
49  }
50
51  void CSpatialTransformFilter::onInputReady(std::vector<CDataPacketPtr> data)
52  {
53    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
54    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue);
55    if (outputPacket)
56      onOutputReady(outputPacket);
57  }
58
59  CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)
60    : gridTransformation(gridTransformation)
61  {
62    if (!gridTransformation)
63      ERROR("CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)",
64            "Impossible to construct a spatial transform filter engine without a valid grid transformation.");
65  }
66
67  std::map<CGridTransformation*, boost::shared_ptr<CSpatialTransformFilterEngine> > *CSpatialTransformFilterEngine::engines_ptr = 0;
68
69  CSpatialTransformFilterEngine* CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)
70  {
71    if (!gridTransformation)
72      ERROR("CSpatialTransformFilterEngine& CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)",
73            "Impossible to get the requested engine, the grid transformation is invalid.");
74
75    if(engines_ptr == NULL) engines_ptr = new std::map<CGridTransformation*, boost::shared_ptr<CSpatialTransformFilterEngine> >;
76
77    std::map<CGridTransformation*, boost::shared_ptr<CSpatialTransformFilterEngine> >::iterator it = engines_ptr->find(gridTransformation);
78    if (it == engines_ptr->end())
79    {
80      boost::shared_ptr<CSpatialTransformFilterEngine> engine(new CSpatialTransformFilterEngine(gridTransformation));
81      it = engines_ptr->insert(std::make_pair(gridTransformation, engine)).first;
82    }
83
84    return it->second.get();
85  }
86
87  CDataPacketPtr CSpatialTransformFilterEngine::apply(std::vector<CDataPacketPtr> data)
88  {
89    /* Nothing to do */
90  }
91
92  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue)
93  {
94    CDataPacketPtr packet(new CDataPacket);
95    packet->date = data[0]->date;
96    packet->timestamp = data[0]->timestamp;
97    packet->status = data[0]->status;
98
99    if (packet->status == CDataPacket::NO_ERROR)
100    {
101      if (1 < data.size())  // Dynamical transformations
102      {
103        std::vector<CArray<double,1>* > dataAuxInputs(data.size()-1);
104        for (size_t idx = 0; idx < dataAuxInputs.size(); ++idx) dataAuxInputs[idx] = &(data[idx+1]->data);
105        gridTransformation->computeAll(dataAuxInputs, packet->timestamp);
106      }
107      packet->data.resize(gridTransformation->getGridDestination()->storeIndex_client.numElements());
108      if (0 != packet->data.numElements())
109        (packet->data)(0) = defaultValue;
110      apply(data[0]->data, packet->data);
111    }
112
113    return packet;
114  }
115
116  void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)
117  {
118    CContextClient* client = CContext::getCurrent()->client;
119
120    // Get default value for output data
121    bool ignoreMissingValue = false; 
122    double defaultValue = std::numeric_limits<double>::quiet_NaN();
123    if (0 != dataDest.numElements()) ignoreMissingValue = NumTraits<double>::isnan(dataDest(0));
124
125    const std::list<CGridTransformation::SendingIndexGridSourceMap>& listLocalIndexSend = gridTransformation->getLocalIndexToSendFromGridSource();
126    const std::list<CGridTransformation::RecvIndexGridDestinationMap>& listLocalIndexToReceive = gridTransformation->getLocalIndexToReceiveOnGridDest();
127    const std::list<size_t>& listNbLocalIndexToReceive = gridTransformation->getNbLocalIndexToReceiveOnGridDest();
128    const std::list<std::vector<bool> >& listLocalIndexMaskOnDest = gridTransformation->getLocalMaskIndexOnGridDest();
129    const std::vector<CGenericAlgorithmTransformation*>& listAlgos = gridTransformation->getAlgos();
130
131    CArray<double,1> dataCurrentDest(dataSrc.copy());
132
133    std::list<CGridTransformation::SendingIndexGridSourceMap>::const_iterator itListSend  = listLocalIndexSend.begin(),
134                                                                              iteListSend = listLocalIndexSend.end();
135    std::list<CGridTransformation::RecvIndexGridDestinationMap>::const_iterator itListRecv = listLocalIndexToReceive.begin();
136    std::list<size_t>::const_iterator itNbListRecv = listNbLocalIndexToReceive.begin();
137    std::list<std::vector<bool> >::const_iterator itLocalMaskIndexOnDest = listLocalIndexMaskOnDest.begin();
138    std::vector<CGenericAlgorithmTransformation*>::const_iterator itAlgo = listAlgos.begin();
139
140    for (; itListSend != iteListSend; ++itListSend, ++itListRecv, ++itNbListRecv, ++itLocalMaskIndexOnDest, ++itAlgo)
141    {
142      CArray<double,1> dataCurrentSrc(dataCurrentDest);
143      const CGridTransformation::SendingIndexGridSourceMap& localIndexToSend = *itListSend;
144
145      // Sending data from field sources to do transformations
146      std::map<int, CArray<int,1> >::const_iterator itbSend = localIndexToSend.begin(), itSend,
147                                                    iteSend = localIndexToSend.end();
148      int idxSendBuff = 0;
149      std::vector<double*> sendBuff(localIndexToSend.size());
150      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
151      {
152        if (0 != itSend->second.numElements())
153          sendBuff[idxSendBuff] = new double[itSend->second.numElements()];
154      }
155     
156      const CGridTransformation::RecvIndexGridDestinationMap& localIndexToReceive = *itListRecv;
157      CGridTransformation::RecvIndexGridDestinationMap::const_iterator itbRecv = localIndexToReceive.begin(), itRecv,
158                                                                       iteRecv = localIndexToReceive.end();
159
160      idxSendBuff = 0;
161      std::vector<ep_lib::MPI_Request> sendRecvRequest(localIndexToSend.size()+localIndexToReceive.size());
162      int position = 0;
163      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
164      {
165        int destRank = itSend->first;
166        const CArray<int,1>& localIndex_p = itSend->second;
167        int countSize = localIndex_p.numElements();
168        for (int idx = 0; idx < countSize; ++idx)
169        {
170          sendBuff[idxSendBuff][idx] = dataCurrentSrc(localIndex_p(idx));
171        }
172        MPI_Isend(sendBuff[idxSendBuff], countSize, MPI_DOUBLE, destRank, 12, client->intraComm, &sendRecvRequest[position]);
173        position++;
174      }
175
176      // Receiving data on destination fields
177     
178      int recvBuffSize = 0;
179      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv) recvBuffSize += itRecv->second.size(); //(recvBuffSize < itRecv->second.size())
180                                                                       //? itRecv->second.size() : recvBuffSize;
181      double* recvBuff;
182      if (0 != recvBuffSize) recvBuff = new double[recvBuffSize];
183      int currentBuff = 0;
184      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
185      {
186        int srcRank = itRecv->first;
187        int countSize = itRecv->second.size();
188       
189        MPI_Irecv(recvBuff + currentBuff, countSize, MPI_DOUBLE, srcRank, 12, client->intraComm, &sendRecvRequest[position]);
190        position++;
191        currentBuff += countSize;
192      }
193      std::vector<ep_lib::MPI_Status> status(sendRecvRequest.size());
194      MPI_Waitall(sendRecvRequest.size(), &sendRecvRequest[0], &status[0]);
195
196      dataCurrentDest.resize(*itNbListRecv);
197      const std::vector<bool>& localMaskDest = *itLocalMaskIndexOnDest;
198      for (int i = 0; i < localMaskDest.size(); ++i)
199        if (localMaskDest[i]) dataCurrentDest(i) = 0.0;
200        else dataCurrentDest(i) = defaultValue;
201
202      std::vector<bool> localInitFlag(dataCurrentDest.numElements(), true);
203      currentBuff = 0;
204      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
205      {
206        int countSize = itRecv->second.size();
207        const std::vector<std::pair<int,double> >& localIndex_p = itRecv->second;
208        (*itAlgo)->apply(localIndex_p,
209                         recvBuff+currentBuff,
210                         dataCurrentDest,
211                         localInitFlag,
212                         ignoreMissingValue);
213
214        currentBuff += countSize;
215      }
216
217      (*itAlgo)->updateData(dataCurrentDest);
218
219      idxSendBuff = 0;
220      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
221      {
222        if (0 != itSend->second.numElements())
223          delete [] sendBuff[idxSendBuff];
224      }
225      if (0 != recvBuffSize) delete [] recvBuff;
226    }
227    if (dataCurrentDest.numElements() != dataDest.numElements())
228    ERROR("CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)",
229          "Incoherent between the received size and expected size. " << std::endl
230          << "Expected size: " << dataDest.numElements() << std::endl
231          << "Received size: " << dataCurrentDest.numElements());
232
233    dataDest = dataCurrentDest;
234  }
235} // namespace xios
Note: See TracBrowser for help on using the repository browser.