source: XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_server_source_filter.cpp @ 2230

Last change on this file since 2230 was 2205, checked in by ymipsl, 3 years ago

Fix when checking late data : need to call eventLoop for current context to receive data not alreday sent by server.

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 5.5 KB
Line 
1#include "client_from_server_source_filter.hpp"
2#include "grid.hpp"
3#include "field.hpp"
4#include "file.hpp"
5#include "exception.hpp"
6#include "calendar_util.hpp"
7#include "context.hpp"
8#include "event_client.hpp"
9#include "timer.hpp"
10#include "tracer.hpp"
11#include <limits>
12#include "workflow_graph.hpp"
13
14namespace xios
15{
16  CClientFromServerSourceFilter::CClientFromServerSourceFilter(CGarbageCollector& gc, CField* field)
17    : COutputPin(gc, true)
18  {
19    CContext* context = CContext::getCurrent();
20    field_ = field ;
21    grid_= field->getSentGrid();
22    freqOp_ = field->getRelFile()->output_freq ;
23    client_= field->getRelFile()->getContextClient() ;
24    lastDateReceived_ = context->getCalendar()->getInitDate();
25    offset_ = field->freq_offset ;
26  }
27
28  void CClientFromServerSourceFilter::streamData(CEventServer& event)
29  {
30    const bool wasEOF = isEOF_;
31    int record ;
32    for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> record  ;
33    isEOF_ = (record == int(-1));
34
35   
36    if (wasDataAlreadyReceived_) lastDateReceived_ = lastDateReceived_ + freqOp_ ;
37    else wasDataAlreadyReceived_ = true;
38   
39    CDataPacketPtr packet(new CDataPacket);
40    packet->date = lastDateReceived_ + offset_;
41    packet->timestamp = packet->date ;
42
43    if (isEOF_) 
44    {
45      if (!wasEOF) dateEOF_ = lastDateReceived_;
46      packet->status = CDataPacket::END_OF_STREAM;
47      info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ;
48      info(20)<<"lastDateReceived_ "<<lastDateReceived_<< "  date "<<packet->date<<"  ----> EOF"<<endl; 
49
50    }
51    else 
52    {
53      CContextClient* client = event.getContextServer()->getAssociatedClient() ;
54      grid_->getClientFromServerConnector(client)->transfer(event, packet->data) ; // to avoid to make a search in map for corresponding client connector,
55     
56      info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ;
57      info(20)<<"lastDateReceived_ "<<lastDateReceived_<< "  date "<<packet->date<<endl;                                                                                    // make a registration at initialization once
58      packet->status = CDataPacket::NO_ERROR;
59    }
60
61    if(this->graphEnabled)
62    {
63      this->graphPackage->filterId = CWorkflowGraph::getNodeSize();
64      CWorkflowGraph::addNode("Client from Server Source filter", 1, false, 0, packet);
65    }
66    onOutputReady(packet);
67
68  }
69 
70  int CClientFromServerSourceFilter::sendReadDataRequest(const CDate& tsDataRequested)
71  {
72    CContext* context = CContext::getCurrent();
73    lastDataRequestedFromServer_ = tsDataRequested;
74
75    // No need to send the request if we are sure that we are already at EOF
76    if (!isEOF_ || context->getCalendar()->getCurrentDate() <= dateEOF_)
77    {
78      CEventClient event(field_->getType(), CField::EVENT_ID_READ_DATA);
79      if (client_->isServerLeader())
80      {
81        CMessage msg;
82        msg << field_->getId();
83        for(auto& rank : client_->getRanksServerLeader()) event.push(rank, 1, msg);
84        client_->sendEvent(event);
85      }
86      else client_->sendEvent(event);
87    }
88    else 
89    {
90      CDataPacketPtr packet(new CDataPacket);
91      packet->date = tsDataRequested;
92      packet->timestamp = packet->date ;
93      packet->status = CDataPacket::END_OF_STREAM;
94      onOutputReady(packet);
95    }
96
97    wasDataRequestedFromServer_ = true;
98
99    return !isEOF_;
100  }
101
102  bool CClientFromServerSourceFilter::sendReadDataRequestIfNeeded(void)
103  TRY
104  {
105    const CDate& currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate();
106
107    bool dataRequested = false;
108
109    while (currentDate >= lastDataRequestedFromServer_)
110    {
111      info(20) << "currentDate : " << currentDate << endl ;
112      info(20) << "Field : " << field_->getId() << endl ;
113      info(20) << "lastDataRequestedFromServer : " << lastDataRequestedFromServer_ << endl ;
114      info(20) << "freqOp : " << freqOp_ << endl ;
115      info(20) << "lastDataRequestedFromServer + fileIn_->output_freq.getValue() : " << lastDataRequestedFromServer_ + freqOp_ << endl ;
116
117      dataRequested |= sendReadDataRequest(lastDataRequestedFromServer_ + freqOp_);
118    }
119
120    return dataRequested;
121  }
122  CATCH
123
124  void CClientFromServerSourceFilter::checkForLateData(void)
125  TRY
126  {
127    CContext* context = CContext::getCurrent();
128    // Check if data previously requested has been received as expected
129    if (wasDataRequestedFromServer_ && ! isEOF_)
130    {
131      CTimer timer("CClientFromServerSourceFilter::checkForLateDataFromServer");
132      timer.resume();
133      traceOff() ;
134      timer.suspend();
135     
136      bool isLate;
137      do
138      {
139        isLate = isDataLate();
140        if (isLate)
141        {
142          timer.resume();
143          context->globalEventLoop();
144          context->eventLoop() ; 
145          timer.suspend();
146        }
147      }
148      while (isLate && timer.getCumulatedTime() < CXios::recvFieldTimeout);
149      timer.resume();
150      traceOn() ;
151      timer.suspend() ;
152
153
154      if (isLate)
155        ERROR("void CClientFromServerSourceFilter::checkForLateDataFromServer(void)",
156              << "Late data at timestep = " << context->getCalendar()->getCurrentDate());
157    }
158  }
159  CATCH
160
161 
162  bool CClientFromServerSourceFilter::isDataLate(void)
163  {
164    bool isDataLate ;
165    CDate currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate() ;
166
167    const CDate nextDataDue = wasDataAlreadyReceived_ ? (lastDateReceived_ + freqOp_) : CContext::getCurrent()->getCalendar()->getInitDate();
168    isDataLate = (nextDataDue <= currentDate);
169   
170    return isDataLate ; 
171   
172  }
173
174
175} // namespace xios
Note: See TracBrowser for help on using the repository browser.