source: XIOS/trunk/src/filter/spatial_transform_filter.cpp @ 1006

Last change on this file since 1006 was 1006, checked in by rlacroix, 4 years ago

The workflow is now triggered when using xios_recv_field for fields in read mode received from the servers.

Previously the workflow was triggered upon receiving the data which could cause deadlocks since there are no garanties that clients are receiving data at the same time.

File size: 10.5 KB
Line 
1#include "spatial_transform_filter.hpp"
2#include "grid_transformation.hpp"
3#include "context.hpp"
4#include "context_client.hpp"
5
6namespace xios
7{
8  CSpatialTransformFilter::CSpatialTransformFilter(CGarbageCollector& gc, CSpatialTransformFilterEngine* engine, double outputValue, size_t inputSlotsCount)
9    : CFilter(gc, inputSlotsCount, engine), outputDefaultValue(outputValue)
10  { /* Nothing to do */ }
11
12  std::pair<boost::shared_ptr<CSpatialTransformFilter>, boost::shared_ptr<CSpatialTransformFilter> >
13  CSpatialTransformFilter::buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid, double defaultValue)
14  {
15    if (!srcGrid || !destGrid)
16      ERROR("std::pair<boost::shared_ptr<CSpatialTransformFilter>, boost::shared_ptr<CSpatialTransformFilter> >"
17            "buildFilterGraph(CGarbageCollector& gc, CGrid* srcGrid, CGrid* destGrid)",
18            "Impossible to build the filter graph if either the source or the destination grid are null.");
19
20    boost::shared_ptr<CSpatialTransformFilter> firstFilter, lastFilter;
21    // Note that this loop goes from the last transformation to the first transformation
22    do
23    {
24      CGridTransformation* gridTransformation = destGrid->getTransformations();
25      CSpatialTransformFilterEngine* engine = CSpatialTransformFilterEngine::get(destGrid->getTransformations());
26      const std::vector<StdString>& auxInputs = gridTransformation->getAuxInputs();
27      size_t inputCount = 1 + (auxInputs.empty() ? 0 : auxInputs.size());
28      boost::shared_ptr<CSpatialTransformFilter> filter(new CSpatialTransformFilter(gc, engine, defaultValue, inputCount));
29
30      if (!lastFilter)
31        lastFilter = filter;
32      else
33        filter->connectOutput(firstFilter, 0);
34
35      firstFilter = filter;
36      for (size_t idx = 0; idx < auxInputs.size(); ++idx)
37      {
38        CField* fieldAuxInput = CField::get(auxInputs[idx]);
39        fieldAuxInput->buildFilterGraph(gc, false);
40        fieldAuxInput->getInstantDataFilter()->connectOutput(firstFilter,idx+1);
41      }
42
43      destGrid = gridTransformation->getGridSource();
44    }
45    while (destGrid != srcGrid);
46
47    return std::make_pair(firstFilter, lastFilter);
48  }
49
50  void CSpatialTransformFilter::onInputReady(std::vector<CDataPacketPtr> data)
51  {
52    CSpatialTransformFilterEngine* spaceFilter = static_cast<CSpatialTransformFilterEngine*>(engine);
53    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue);
54    if (outputPacket)
55      onOutputReady(outputPacket);
56  }
57
58  CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)
59    : gridTransformation(gridTransformation)
60  {
61    if (!gridTransformation)
62      ERROR("CSpatialTransformFilterEngine::CSpatialTransformFilterEngine(CGridTransformation* gridTransformation)",
63            "Impossible to construct a spatial transform filter engine without a valid grid transformation.");
64  }
65
66  std::map<CGridTransformation*, boost::shared_ptr<CSpatialTransformFilterEngine> > CSpatialTransformFilterEngine::engines;
67
68  CSpatialTransformFilterEngine* CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)
69  {
70    if (!gridTransformation)
71      ERROR("CSpatialTransformFilterEngine& CSpatialTransformFilterEngine::get(CGridTransformation* gridTransformation)",
72            "Impossible to get the requested engine, the grid transformation is invalid.");
73
74    std::map<CGridTransformation*, boost::shared_ptr<CSpatialTransformFilterEngine> >::iterator it = engines.find(gridTransformation);
75    if (it == engines.end())
76    {
77      boost::shared_ptr<CSpatialTransformFilterEngine> engine(new CSpatialTransformFilterEngine(gridTransformation));
78      it = engines.insert(std::make_pair(gridTransformation, engine)).first;
79    }
80
81    return it->second.get();
82  }
83
84  CDataPacketPtr CSpatialTransformFilterEngine::apply(std::vector<CDataPacketPtr> data)
85  {
86    /* Nothing to do */
87  }
88
89  CDataPacketPtr CSpatialTransformFilterEngine::applyFilter(std::vector<CDataPacketPtr> data, double defaultValue)
90  {
91    CDataPacketPtr packet(new CDataPacket);
92    packet->date = data[0]->date;
93    packet->timestamp = data[0]->timestamp;
94    packet->status = data[0]->status;
95
96    if (packet->status == CDataPacket::NO_ERROR)
97    {
98      if (1 < data.size())  // Dynamical transformations
99      {
100        std::vector<CArray<double,1>* > dataAuxInputs(data.size()-1);
101        for (size_t idx = 0; idx < dataAuxInputs.size(); ++idx) dataAuxInputs[idx] = &(data[idx+1]->data);
102        gridTransformation->computeAll(dataAuxInputs, packet->timestamp);
103      }
104      packet->data.resize(gridTransformation->getGridDestination()->storeIndex_client.numElements());
105      packet->data = defaultValue;
106      apply(data[0]->data, packet->data);
107    }
108
109    return packet;
110  }
111
112  void CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)
113  {
114    CContextClient* client = CContext::getCurrent()->client;
115
116    // Get default value for output data
117    double defaultValue = 0.0;
118    if (0 != dataDest.numElements()) defaultValue = dataDest(0);
119
120    const std::list<CGridTransformation::SendingIndexGridSourceMap>& listLocalIndexSend = gridTransformation->getLocalIndexToSendFromGridSource();
121    const std::list<CGridTransformation::RecvIndexGridDestinationMap>& listLocalIndexToReceive = gridTransformation->getLocalIndexToReceiveOnGridDest();
122    const std::list<size_t>& listNbLocalIndexToReceive = gridTransformation->getNbLocalIndexToReceiveOnGridDest();
123    const std::list<std::vector<bool> >& listLocalIndexMaskOnDest = gridTransformation->getLocalMaskIndexOnGridDest();
124    const std::vector<CGenericAlgorithmTransformation*>& listAlgos = gridTransformation->getAlgos();
125
126    CArray<double,1> dataCurrentDest(dataSrc.copy());
127
128    std::list<CGridTransformation::SendingIndexGridSourceMap>::const_iterator itListSend  = listLocalIndexSend.begin(),
129                                                                              iteListSend = listLocalIndexSend.end();
130    std::list<CGridTransformation::RecvIndexGridDestinationMap>::const_iterator itListRecv = listLocalIndexToReceive.begin();
131    std::list<size_t>::const_iterator itNbListRecv = listNbLocalIndexToReceive.begin();
132    std::list<std::vector<bool> >::const_iterator itLocalMaskIndexOnDest = listLocalIndexMaskOnDest.begin();
133    std::vector<CGenericAlgorithmTransformation*>::const_iterator itAlgo = listAlgos.begin();
134
135    for (; itListSend != iteListSend; ++itListSend, ++itListRecv, ++itNbListRecv, ++itLocalMaskIndexOnDest, ++itAlgo)
136    {
137      CArray<double,1> dataCurrentSrc(dataCurrentDest);
138      const CGridTransformation::SendingIndexGridSourceMap& localIndexToSend = *itListSend;
139
140      // Sending data from field sources to do transformations
141      std::map<int, CArray<int,1> >::const_iterator itbSend = localIndexToSend.begin(), itSend,
142                                                    iteSend = localIndexToSend.end();
143      int idxSendBuff = 0;
144      std::vector<double*> sendBuff(localIndexToSend.size());
145      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
146      {
147        if (0 != itSend->second.numElements())
148          sendBuff[idxSendBuff] = new double[itSend->second.numElements()];
149      }
150
151      idxSendBuff = 0;
152      std::vector<MPI_Request> sendRecvRequest;
153      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
154      {
155        int destRank = itSend->first;
156        const CArray<int,1>& localIndex_p = itSend->second;
157        int countSize = localIndex_p.numElements();
158        for (int idx = 0; idx < countSize; ++idx)
159        {
160          sendBuff[idxSendBuff][idx] = dataCurrentSrc(localIndex_p(idx));
161        }
162        sendRecvRequest.push_back(MPI_Request());
163        MPI_Isend(sendBuff[idxSendBuff], countSize, MPI_DOUBLE, destRank, 12, client->intraComm, &sendRecvRequest.back());
164      }
165
166      // Receiving data on destination fields
167      const CGridTransformation::RecvIndexGridDestinationMap& localIndexToReceive = *itListRecv;
168      CGridTransformation::RecvIndexGridDestinationMap::const_iterator itbRecv = localIndexToReceive.begin(), itRecv,
169                                                                       iteRecv = localIndexToReceive.end();
170      int recvBuffSize = 0;
171      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv) recvBuffSize += itRecv->second.size(); //(recvBuffSize < itRecv->second.size())
172                                                                       //? itRecv->second.size() : recvBuffSize;
173      double* recvBuff;
174      if (0 != recvBuffSize) recvBuff = new double[recvBuffSize];
175      int currentBuff = 0;
176      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
177      {
178        int srcRank = itRecv->first;
179        int countSize = itRecv->second.size();
180        sendRecvRequest.push_back(MPI_Request());
181        MPI_Irecv(recvBuff + currentBuff, countSize, MPI_DOUBLE, srcRank, 12, client->intraComm, &sendRecvRequest.back());
182        currentBuff += countSize;
183      }
184      std::vector<MPI_Status> status(sendRecvRequest.size());
185      MPI_Waitall(sendRecvRequest.size(), &sendRecvRequest[0], &status[0]);
186
187      dataCurrentDest.resize(*itNbListRecv);
188      const std::vector<bool>& localMaskDest = *itLocalMaskIndexOnDest;
189      for (int i = 0; i < localMaskDest.size(); ++i)
190        if (localMaskDest[i]) dataCurrentDest(i) = 0.0;
191        else dataCurrentDest(i) = defaultValue;
192
193      std::vector<bool> localInitFlag(dataCurrentDest.size(), true);
194      currentBuff = 0;
195      for (itRecv = itbRecv; itRecv != iteRecv; ++itRecv)
196      {
197        int countSize = itRecv->second.size();
198        const std::vector<std::pair<int,double> >& localIndex_p = itRecv->second;
199        (*itAlgo)->apply(localIndex_p,
200                         recvBuff+currentBuff,
201                         dataCurrentDest,
202                         localInitFlag,
203                         defaultValue);
204
205        currentBuff += countSize;
206      }
207
208      (*itAlgo)->updateData(dataCurrentDest);
209
210      idxSendBuff = 0;
211      for (itSend = itbSend; itSend != iteSend; ++itSend, ++idxSendBuff)
212      {
213        if (0 != itSend->second.numElements())
214          delete [] sendBuff[idxSendBuff];
215      }
216      if (0 != recvBuffSize) delete [] recvBuff;
217    }
218    if (dataCurrentDest.numElements() != dataDest.numElements())
219    ERROR("CSpatialTransformFilterEngine::apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest)",
220          "Incoherent between the received size and expected size. " << std::endl
221          << "Expected size: " << dataDest.numElements() << std::endl
222          << "Received size: " << dataCurrentDest.numElements());
223
224    dataDest = dataCurrentDest;
225  }
226} // namespace xios
Note: See TracBrowser for help on using the repository browser.