Changeset 1318


Ignore:
Timestamp:
10/26/17 10:23:17 (5 years ago)
Author:
rlacroix
Message:

Fix: Handle end-of-file correctly for files in read mode.

Previously desynchronizations between clients could occur, leading to invalid events being received by the server(s).

Location:
XIOS
Files:
12 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/XIOS_DEV_CMIP6/src/node/context.cpp

    r1316 r1318  
    783783   } 
    784784 
    785    void CContext::checkPrefetchingOfEnabledReadModeFiles() 
     785   void CContext::doPostTimestepOperationsForEnabledReadModeFiles() 
    786786   { 
    787787     int size = enabledReadModeFiles.size(); 
    788788     for (int i = 0; i < size; ++i) 
    789789     { 
    790         enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded(); 
     790        enabledReadModeFiles[i]->doPostTimestepOperationsForEnabledReadModeFields(); 
    791791     } 
    792792   } 
     
    17091709      if (hasClient && !hasServer) // For now we only use server level 1 to read data 
    17101710      { 
    1711         checkPrefetchingOfEnabledReadModeFiles(); 
     1711        doPostTimestepOperationsForEnabledReadModeFiles(); 
    17121712        garbageCollector.invalidate(calendar->getCurrentDate()); 
    17131713      } 
  • XIOS/dev/XIOS_DEV_CMIP6/src/node/context.hpp

    r1239 r1318  
    121121         void buildFilterGraphOfEnabledFields(); 
    122122         void startPrefetchingOfEnabledReadModeFiles(); 
    123          void checkPrefetchingOfEnabledReadModeFiles(); 
     123         void doPostTimestepOperationsForEnabledReadModeFiles(); 
    124124         void findFieldsWithReadAccess(void); 
    125125         void solveAllRefOfFieldsWithReadAccess(); 
  • XIOS/dev/XIOS_DEV_CMIP6/src/node/field.cpp

    r1315 r1318  
    4040      , hasTimeInstant(false) 
    4141      , hasTimeCentered(false) 
     42      , wasDataRequestedFromServer(false) 
    4243      , wasDataAlreadyReceivedFromServer(false) 
    4344      , isEOF(false), nstepMaxRead(false) 
     
    5556      , hasTimeInstant(false) 
    5657      , hasTimeCentered(false) 
     58      , wasDataRequestedFromServer(false) 
    5759      , wasDataAlreadyReceivedFromServer(false) 
    5860      , isEOF(false), nstepMaxRead(false) 
     
    291293    lastDataRequestedFromServer = tsDataRequested; 
    292294 
    293     if (!isEOF) // No need to send the request if we already know we are at EOF 
     295    // No need to send the request if we are sure that we are already at EOF 
     296    if (!isEOF || context->getCalendar()->getCurrentDate() <= dateEOF) 
    294297    { 
    295298      CEventClient event(getType(), EVENT_ID_READ_DATA); 
     
    307310    else 
    308311      serverSourceFilter->signalEndOfStream(tsDataRequested); 
     312 
     313    wasDataRequestedFromServer = true; 
    309314 
    310315    return !isEOF; 
     
    520525  { 
    521526    CContext* context = CContext::getCurrent(); 
    522     int record; 
    523527    std::map<int, CArray<double,1> > data; 
     528    const bool wasEOF = isEOF; 
    524529 
    525530    for (int i = 0; i < ranks.size(); i++) 
    526531    { 
    527532      int rank = ranks[i]; 
     533      int record; 
    528534      *buffers[i] >> record; 
    529535      isEOF = (record == int(-1)); 
     
    544550 
    545551    if (isEOF) 
     552    { 
     553      if (!wasEOF) 
     554        dateEOF = lastDataReceivedFromServer; 
     555 
    546556      serverSourceFilter->signalEndOfStream(lastDataReceivedFromServer); 
     557    } 
    547558    else 
    548559      serverSourceFilter->streamDataFromServer(lastDataReceivedFromServer, data); 
     560  } 
     561 
     562  void CField::checkForLateDataFromServer(void) 
     563  { 
     564    CContext* context = CContext::getCurrent(); 
     565    const CDate& currentDate = context->getCalendar()->getCurrentDate(); 
     566 
     567    // Check if data previously requested has been received as expected 
     568    if (wasDataRequestedFromServer && (!isEOF || context->getCalendar()->getCurrentDate() <= dateEOF)) 
     569    { 
     570      CTimer timer("CField::checkForLateDataFromServer"); 
     571 
     572      bool isDataLate = !wasDataAlreadyReceivedFromServer || lastDataReceivedFromServer + file->output_freq < currentDate; 
     573      while (isDataLate && timer.getCumulatedTime() < CXios::recvFieldTimeout) 
     574      { 
     575        timer.resume(); 
     576 
     577        context->checkBuffersAndListen(); 
     578 
     579        timer.suspend(); 
     580 
     581        isDataLate = !wasDataAlreadyReceivedFromServer || lastDataReceivedFromServer + file->output_freq < currentDate; 
     582      } 
     583 
     584      if (isDataLate) 
     585        ERROR("void CField::checkForLateDataFromServer(void)", 
     586              << "Late data at timestep = " << currentDate); 
     587    } 
    549588  } 
    550589 
  • XIOS/dev/XIOS_DEV_CMIP6/src/node/field.hpp

    r1294 r1318  
    172172        static void recvReadDataReady(CEventServer& event); 
    173173        void recvReadDataReady(vector<int> ranks, vector<CBufferIn*> buffers); 
     174        void checkForLateDataFromServer(void); 
    174175        void outputField(CArray<double,3>& fieldOut); 
    175176        void outputField(CArray<double,2>& fieldOut); 
     
    217218         bool isEOF; 
    218219         CDate lastlast_Write_srv, last_Write_srv, last_operation_srv; 
    219          CDate lastDataRequestedFromServer, lastDataReceivedFromServer; 
    220          bool wasDataAlreadyReceivedFromServer; 
     220         CDate lastDataRequestedFromServer, lastDataReceivedFromServer, dateEOF; 
     221         bool wasDataRequestedFromServer, wasDataAlreadyReceivedFromServer; 
    221222 
    222223         map<int,boost::shared_ptr<func::CFunctor> > foperation_srv; 
  • XIOS/dev/XIOS_DEV_CMIP6/src/node/file.cpp

    r1316 r1318  
    867867 
    868868   /*! 
    869      Prefetching the data for enabled fields read from file whose data is out-of-date. 
    870    */ 
    871    void CFile::prefetchEnabledReadModeFieldsIfNeeded(void) 
     869     Do all post timestep operations for enabled fields in read mode: 
     870      - Prefetch the data read from file when needed 
     871      - Check that the data excepted from server has been received 
     872   */ 
     873   void CFile::doPostTimestepOperationsForEnabledReadModeFields(void) 
    872874   { 
    873875     if (mode.isEmpty() || mode.getValue() != mode_attr::read) 
     
    876878     int size = this->enabledFields.size(); 
    877879     for (int i = 0; i < size; ++i) 
     880     { 
     881       this->enabledFields[i]->checkForLateDataFromServer(); 
    878882       this->enabledFields[i]->sendReadDataRequestIfNeeded(); 
     883     } 
    879884   } 
    880885 
  • XIOS/dev/XIOS_DEV_CMIP6/src/node/file.hpp

    r1239 r1318  
    111111         void buildFilterGraphOfEnabledFields(CGarbageCollector& gc); 
    112112         void prefetchEnabledReadModeFields(); 
    113          void prefetchEnabledReadModeFieldsIfNeeded(); 
     113         void doPostTimestepOperationsForEnabledReadModeFields(); 
    114114 
    115115         void solveAllRefOfEnabledFieldsAndTransform(bool sendToServer); 
  • XIOS/trunk/src/node/context.cpp

    r1200 r1318  
    514514   } 
    515515 
    516    void CContext::checkPrefetchingOfEnabledReadModeFiles() 
     516   void CContext::doPostTimestepOperationsForEnabledReadModeFiles() 
    517517   { 
    518518     int size = enabledReadModeFiles.size(); 
    519519     for (int i = 0; i < size; ++i) 
    520520     { 
    521         enabledReadModeFiles[i]->prefetchEnabledReadModeFieldsIfNeeded(); 
     521        enabledReadModeFiles[i]->doPostTimestepOperationsForEnabledReadModeFields(); 
    522522     } 
    523523   } 
     
    12101210      if (hasClient) 
    12111211      { 
    1212         checkPrefetchingOfEnabledReadModeFiles(); 
     1212        doPostTimestepOperationsForEnabledReadModeFiles(); 
    12131213        garbageCollector.invalidate(calendar->getCurrentDate()); 
    12141214      } 
  • XIOS/trunk/src/node/context.hpp

    r1033 r1318  
    115115         void buildFilterGraphOfEnabledFields(); 
    116116         void startPrefetchingOfEnabledReadModeFiles(); 
    117          void checkPrefetchingOfEnabledReadModeFiles(); 
     117         void doPostTimestepOperationsForEnabledReadModeFiles(); 
    118118         void findFieldsWithReadAccess(void); 
    119119         void solveAllRefOfFieldsWithReadAccess(); 
  • XIOS/trunk/src/node/field.cpp

    r1315 r1318  
    3838      , hasTimeInstant(false) 
    3939      , hasTimeCentered(false) 
     40      , wasDataRequestedFromServer(false) 
    4041      , wasDataAlreadyReceivedFromServer(false) 
    4142      , isEOF(false) 
     
    5253      , hasTimeInstant(false) 
    5354      , hasTimeCentered(false) 
     55      , wasDataRequestedFromServer(false) 
    5456      , wasDataAlreadyReceivedFromServer(false) 
    5557      , isEOF(false) 
     
    261263    lastDataRequestedFromServer = tsDataRequested; 
    262264 
    263     if (!isEOF) // No need to send the request if we already know we are at EOF 
     265    // No need to send the request if we are sure that we are already at EOF 
     266    if (!isEOF || context->getCalendar()->getCurrentDate() <= dateEOF) 
    264267    { 
    265268      CEventClient event(getType(), EVENT_ID_READ_DATA); 
     
    277280    else 
    278281      serverSourceFilter->signalEndOfStream(tsDataRequested); 
     282 
     283    wasDataRequestedFromServer = true; 
    279284 
    280285    return !isEOF; 
     
    433438  { 
    434439    CContext* context = CContext::getCurrent(); 
    435     int record; 
    436440    std::map<int, CArray<double,1> > data; 
     441    const bool wasEOF = isEOF; 
    437442 
    438443    for (int i = 0; i < ranks.size(); i++) 
    439444    { 
    440445      int rank = ranks[i]; 
     446      int record; 
    441447      *buffers[i] >> record; 
    442448      isEOF = (record == int(-1)); 
     
    457463 
    458464    if (isEOF) 
     465    { 
     466      if (!wasEOF) 
     467        dateEOF = lastDataReceivedFromServer; 
     468 
    459469      serverSourceFilter->signalEndOfStream(lastDataReceivedFromServer); 
     470    } 
    460471    else 
    461472      serverSourceFilter->streamDataFromServer(lastDataReceivedFromServer, data); 
     473  } 
     474 
     475  void CField::checkForLateDataFromServer(void) 
     476  { 
     477    CContext* context = CContext::getCurrent(); 
     478    const CDate& currentDate = context->getCalendar()->getCurrentDate(); 
     479 
     480    // Check if data previously requested has been received as expected 
     481    if (wasDataRequestedFromServer && (!isEOF || context->getCalendar()->getCurrentDate() <= dateEOF)) 
     482    { 
     483      CTimer timer("CField::checkForLateDataFromServer"); 
     484 
     485      bool isDataLate = !wasDataAlreadyReceivedFromServer || lastDataReceivedFromServer + file->output_freq < currentDate; 
     486      while (isDataLate && timer.getCumulatedTime() < CXios::recvFieldTimeout) 
     487      { 
     488        timer.resume(); 
     489 
     490        context->checkBuffersAndListen(); 
     491 
     492        timer.suspend(); 
     493 
     494        isDataLate = !wasDataAlreadyReceivedFromServer || lastDataReceivedFromServer + file->output_freq < currentDate; 
     495      } 
     496 
     497      if (isDataLate) 
     498        ERROR("void CField::checkForLateDataFromServer(void)", 
     499              << "Late data at timestep = " << currentDate); 
     500    } 
    462501  } 
    463502 
  • XIOS/trunk/src/node/field.hpp

    r1286 r1318  
    150150        static void recvReadDataReady(CEventServer& event); 
    151151        void recvReadDataReady(vector<int> ranks, vector<CBufferIn*> buffers); 
     152        void checkForLateDataFromServer(void); 
    152153        void outputField(CArray<double,3>& fieldOut); 
    153154        void outputField(CArray<double,2>& fieldOut); 
     
    197198         bool isEOF; 
    198199         CDate lastlast_Write_srv, last_Write_srv, last_operation_srv; 
    199          CDate lastDataRequestedFromServer, lastDataReceivedFromServer; 
    200          bool wasDataAlreadyReceivedFromServer; 
     200         CDate lastDataRequestedFromServer, lastDataReceivedFromServer, dateEOF; 
     201         bool wasDataRequestedFromServer, wasDataAlreadyReceivedFromServer; 
    201202 
    202203         map<int,boost::shared_ptr<func::CFunctor> > foperation_srv; 
  • XIOS/trunk/src/node/file.cpp

    r1286 r1318  
    783783 
    784784   /*! 
    785      Prefetching the data for enabled fields read from file whose data is out-of-date. 
    786    */ 
    787    void CFile::prefetchEnabledReadModeFieldsIfNeeded(void) 
     785     Do all post timestep operations for enabled fields in read mode: 
     786      - Prefetch the data read from file when needed 
     787      - Check that the data excepted from server has been received 
     788   */ 
     789   void CFile::doPostTimestepOperationsForEnabledReadModeFields(void) 
    788790   { 
    789791     if (mode.isEmpty() || mode.getValue() != mode_attr::read) 
     
    792794     int size = this->enabledFields.size(); 
    793795     for (int i = 0; i < size; ++i) 
     796     { 
     797       this->enabledFields[i]->checkForLateDataFromServer(); 
    794798       this->enabledFields[i]->sendReadDataRequestIfNeeded(); 
     799     } 
    795800   } 
    796801 
  • XIOS/trunk/src/node/file.hpp

    r1090 r1318  
    107107         void buildFilterGraphOfEnabledFields(CGarbageCollector& gc); 
    108108         void prefetchEnabledReadModeFields(); 
    109          void prefetchEnabledReadModeFieldsIfNeeded(); 
     109         void doPostTimestepOperationsForEnabledReadModeFields(); 
    110110 
    111111         // Add component into file 
Note: See TracChangeset for help on using the changeset viewer.