source: XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_server_source_filter.cpp @ 1960

Last change on this file since 1960 was 1934, checked in by ymipsl, 4 years ago

Big update on on going work related to data distribution and transfer between clients and servers.
Revisite of the source and store filter using "connectors".

-> inputs work again

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 5.2 KB
Line 
1#include "client_from_server_source_filter.hpp"
2#include "grid.hpp"
3#include "field.hpp"
4#include "file.hpp"
5#include "exception.hpp"
6#include "calendar_util.hpp"
7#include "context.hpp"
8#include "event_client.hpp"
9#include "timer.hpp"
10#include "tracer.hpp"
11#include <limits>
12
13namespace xios
14{
15  CClientFromServerSourceFilter::CClientFromServerSourceFilter(CGarbageCollector& gc, CField* field)
16    : COutputPin(gc, true)
17  {
18    CContext* context = CContext::getCurrent();
19    field_ = field ;
20    grid_= field->getGrid();
21    freqOp_ = field->getRelFile()->output_freq ;
22    client_= field->getRelFile()->getContextClient() ;
23    lastDateReceived_ = context->getCalendar()->getInitDate();
24    offset_ = field->freq_offset ;
25  }
26
27  void CClientFromServerSourceFilter::streamData(CEventServer& event)
28  {
29    const bool wasEOF = isEOF_;
30    int record ;
31    for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> record  ;
32    isEOF_ = (record == int(-1));
33
34   
35    if (wasDataAlreadyReceived_) lastDateReceived_ = lastDateReceived_ + freqOp_ ;
36    else wasDataAlreadyReceived_ = true;
37   
38    CDataPacketPtr packet(new CDataPacket);
39    packet->date = lastDateReceived_ + offset_;
40    packet->timestamp = packet->date ;
41
42    if (isEOF_) 
43    {
44      if (!wasEOF) dateEOF_ = lastDateReceived_;
45      packet->status = CDataPacket::END_OF_STREAM;
46      info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ;
47      info(20)<<"lastDateReceived_ "<<lastDateReceived_<< "  date "<<packet->date<<"  ----> EOF"<<endl; 
48
49    }
50    else 
51    {
52      CContextClient* client = event.getContextServer()->getAssociatedClient() ;
53      grid_->getClientFromServerConnector(client)->transfer(event, packet->data) ; // to avoid to make a search in map for corresponding client connector,
54     
55      info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ;
56      info(20)<<"lastDateReceived_ "<<lastDateReceived_<< "  date "<<packet->date<<endl;                                                                                    // make a registration at initialization once
57      packet->status = CDataPacket::NO_ERROR;
58    }
59    onOutputReady(packet);
60
61  }
62 
63  int CClientFromServerSourceFilter::sendReadDataRequest(const CDate& tsDataRequested)
64  {
65    CContext* context = CContext::getCurrent();
66    lastDataRequestedFromServer_ = tsDataRequested;
67
68    // No need to send the request if we are sure that we are already at EOF
69    if (!isEOF_ || context->getCalendar()->getCurrentDate() <= dateEOF_)
70    {
71      CEventClient event(field_->getType(), CField::EVENT_ID_READ_DATA);
72      if (client_->isServerLeader())
73      {
74        CMessage msg;
75        msg << field_->getId();
76        for(auto& rank : client_->getRanksServerLeader()) event.push(rank, 1, msg);
77        client_->sendEvent(event);
78      }
79      else client_->sendEvent(event);
80    }
81    else 
82    {
83      CDataPacketPtr packet(new CDataPacket);
84      packet->date = tsDataRequested;
85      packet->timestamp = packet->date ;
86      packet->status = CDataPacket::END_OF_STREAM;
87      onOutputReady(packet);
88    }
89
90    wasDataRequestedFromServer_ = true;
91
92    return !isEOF_;
93  }
94
95  bool CClientFromServerSourceFilter::sendReadDataRequestIfNeeded(void)
96  TRY
97  {
98    const CDate& currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate();
99
100    bool dataRequested = false;
101
102    while (currentDate >= lastDataRequestedFromServer_)
103    {
104      info(20) << "currentDate : " << currentDate << endl ;
105      info(20) << "Field : " << field_->getId() << endl ;
106      info(20) << "lastDataRequestedFromServer : " << lastDataRequestedFromServer_ << endl ;
107      info(20) << "freqOp : " << freqOp_ << endl ;
108      info(20) << "lastDataRequestedFromServer + fileIn_->output_freq.getValue() : " << lastDataRequestedFromServer_ + freqOp_ << endl ;
109
110      dataRequested |= sendReadDataRequest(lastDataRequestedFromServer_ + freqOp_);
111    }
112
113    return dataRequested;
114  }
115  CATCH
116
117  void CClientFromServerSourceFilter::checkForLateData(void)
118  TRY
119  {
120    CContext* context = CContext::getCurrent();
121    // Check if data previously requested has been received as expected
122    if (wasDataRequestedFromServer_ && ! isEOF_)
123    {
124      CTimer timer("CClientFromServerSourceFilter::checkForLateDataFromServer");
125      timer.resume();
126      traceOff() ;
127      timer.suspend();
128     
129      bool isLate;
130      do
131      {
132        isLate = isDataLate();
133        if (isLate)
134        {
135          timer.resume();
136          context->globalEventLoop();
137          timer.suspend();
138        }
139      }
140      while (isLate && timer.getCumulatedTime() < CXios::recvFieldTimeout);
141      timer.resume();
142      traceOn() ;
143      timer.suspend() ;
144
145
146      if (isLate)
147        ERROR("void CClientFromServerSourceFilter::checkForLateDataFromServer(void)",
148              << "Late data at timestep = " << context->getCalendar()->getCurrentDate());
149    }
150  }
151  CATCH
152
153 
154  bool CClientFromServerSourceFilter::isDataLate(void)
155  {
156    bool isDataLate ;
157    CDate currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate() ;
158
159    const CDate nextDataDue = wasDataAlreadyReceived_ ? (lastDateReceived_ + freqOp_) : CContext::getCurrent()->getCalendar()->getInitDate();
160    isDataLate = (nextDataDue <= currentDate);
161   
162    return isDataLate ; 
163   
164  }
165
166
167} // namespace xios
Note: See TracBrowser for help on using the repository browser.