Ignore:
Timestamp:
03/22/18 10:43:20 (6 years ago)
Author:
yushan
Message:

branch_openmp merged with XIOS_DEV_CMIP6@1459

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/branch_openmp/src/node/file.cpp

    r1338 r1460  
    1717#include "mpi.hpp" 
    1818#include "timer.hpp" 
    19  
     19#include "server.hpp" 
    2020 
    2121namespace xios { 
     
    2626      : CObjectTemplate<CFile>(), CFileAttributes() 
    2727      , vFieldGroup(), data_out(), enabledFields() 
    28       , allDomainEmpty(false), isOpen(false) 
     28      , isOpen(false), read_client(0), checkRead(false), allZoneEmpty(false) 
    2929   { 
    3030     setVirtualFieldGroup(CFieldGroup::create(getId() + "_virtual_field_group")); 
     
    3535      : CObjectTemplate<CFile>(id), CFileAttributes() 
    3636      , vFieldGroup(), data_out(), enabledFields() 
    37       , allDomainEmpty(false), isOpen(false) 
     37      , isOpen(false), read_client(0), checkRead(false), allZoneEmpty(false) 
    3838    { 
    3939      setVirtualFieldGroup(CFieldGroup::create(getId() + "_virtual_field_group")); 
     
    208208 
    209209   //! Initialize a file in order to write into it 
    210    void CFile::initFile(void) 
     210   void CFile::initWrite(void) 
    211211   { 
    212212      CContext* context = CContext::getCurrent(); 
     
    218218      if (!split_freq.isEmpty()) 
    219219      { 
    220         StdString keySuffix("CContext_"+CContext::getCurrent()->getId()+"::CFile_"+getFileOutputName()+"::") ;  
     220        StdString keySuffix("CFile::"+getFileOutputName()+"::") ;  
    221221        if (context->registryIn->foundKey(keySuffix+"splitStart") && context->registryIn->foundKey(keySuffix+"splitEnd")) 
    222222        { 
     
    229229        } 
    230230      } 
    231       isOpen = false; 
    232  
    233       allDomainEmpty = true; 
     231      isOpen = false;       
    234232 
    235233//      if (!record_offset.isEmpty() && record_offset < 0) 
     
    238236      const int recordOffset = record_offset.isEmpty() ? 0 : record_offset; 
    239237 
    240       // set<CAxis*> setAxis; 
    241       // set<CDomain*> setDomains; 
    242238      set<StdString> setAxis; 
    243239      set<StdString> setDomains; 
    244        
     240 
    245241      std::vector<CField*>::iterator it, end = this->enabledFields.end(); 
    246242      for (it = this->enabledFields.begin(); it != end; it++) 
    247243      { 
    248          CField* field = *it; 
    249          allDomainEmpty &= !field->grid->doGridHaveDataToWrite(); 
     244         CField* field = *it;          
    250245         std::vector<CAxis*> vecAxis = field->grid->getAxis(); 
    251246         for (size_t i = 0; i < vecAxis.size(); ++i) 
    252             setAxis.insert(vecAxis[i]->getAxisOutputName()); 
    253             // setAxis.insert(vecAxis[i]); 
     247           setAxis.insert(vecAxis[i]->getAxisOutputName()); 
    254248         std::vector<CDomain*> vecDomains = field->grid->getDomains(); 
    255249         for (size_t i = 0; i < vecDomains.size(); ++i) 
    256             setDomains.insert(vecDomains[i]->getDomainOutputName()); 
    257             // setDomains.insert(vecDomains[i]); 
     250           setDomains.insert(vecDomains[i]->getDomainOutputName()); 
    258251 
    259252         field->resetNStep(recordOffset); 
     
    263256 
    264257      // create sub communicator for file 
    265       int color = allDomainEmpty ? 0 : 1; 
    266       ep_lib::MPI_Comm_split(server->intraComm, color, server->intraCommRank, &fileComm); 
    267       if (allDomainEmpty) ep_lib::MPI_Comm_free(&fileComm); 
     258      createSubComFile(); 
    268259 
    269260      // if (time_counter.isEmpty()) time_counter.setValue(time_counter_attr::centered); 
     
    271262    } 
    272263 
    273     //! Verify state of a file 
    274     void CFile::checkFile(void) 
     264    //! Initialize a file in order to write into it 
     265    void CFile::initRead(void) 
    275266    { 
    276       if (mode.isEmpty() || mode.getValue() == mode_attr::write) 
    277       { 
    278         CTimer::get("Files : create headers").resume(); 
    279         if (!isOpen) createHeader(); 
    280         CTimer::get("Files : create headers").suspend(); 
    281         checkSync(); 
    282       } 
    283       else 
    284       { 
    285         CTimer::get("Files : open headers").resume(); 
    286         if (!isOpen) openInReadMode(); 
    287         CTimer::get("Files : open headers").suspend(); 
    288       } 
    289       checkSplit(); 
     267      if (checkRead) return; 
     268      createSubComFile(); 
     269      checkRead = true; 
     270    } 
     271 
     272    /*! 
     273      Create a sub communicator in which processes participate in reading/opening file 
     274    */ 
     275    void CFile::createSubComFile() 
     276    { 
     277      CContext* context = CContext::getCurrent(); 
     278      CContextServer* server = context->server; 
     279 
     280      // create sub communicator for file 
     281      allZoneEmpty = true;       
     282      std::vector<CField*>::iterator it, end = this->enabledFields.end(); 
     283      for (it = this->enabledFields.begin(); it != end; it++) 
     284      { 
     285         CField* field = *it; 
     286         bool nullGrid = (0 == field->grid); 
     287         allZoneEmpty &= nullGrid ? false : !field->grid->doGridHaveDataToWrite(); 
     288      } 
     289 
     290      int color = allZoneEmpty ? 0 : 1; 
     291      ep_lib::MPI_Comm_split(server->intraComm, color, server->intraCommRank, &fileComm); 
     292      if (allZoneEmpty) ep_lib::MPI_Comm_free(&fileComm); 
     293    } 
     294 
     295    /* 
     296       Check condition to write into a file 
     297       For now, we only use the level-2 server to write files (if this mode is activated) 
     298       or classical server to do this job. 
     299    */ 
     300    void CFile::checkWriteFile(void) 
     301    { 
     302      CContext* context = CContext::getCurrent(); 
     303      // Done by classical server or secondary server 
     304      // This condition should be changed soon 
     305      if (CServer::serverLevel == 0 || CServer::serverLevel == 2) 
     306      { 
     307        if (mode.isEmpty() || mode.getValue() == mode_attr::write) 
     308        { 
     309          CTimer::get("Files : create headers").resume(); 
     310          if (!isOpen) createHeader(); 
     311          CTimer::get("Files : create headers").suspend(); 
     312          checkSync(); 
     313        }         
     314        checkSplit(); // REally need this? 
     315      } 
     316    } 
     317 
     318    /* 
     319       Check condition to read from a file 
     320       For now, we only use the level-1 server to write files (if this mode is activated) 
     321       or classical server to do this job. 
     322       This function can be used by client for reading metadata 
     323    */ 
     324    void CFile::checkReadFile(void) 
     325    { 
     326      CContext* context = CContext::getCurrent(); 
     327      // Done by classical server or secondary server 
     328      // TODO: This condition should be changed soon. It only works with maximum number of level as 2 
     329      if (CServer::serverLevel == 0 || CServer::serverLevel == 1) 
     330      { 
     331        if (!mode.isEmpty() && mode.getValue() == mode_attr::read) 
     332        { 
     333          CTimer::get("Files : open headers").resume(); 
     334           
     335          if (!isOpen) openInReadMode(); 
     336 
     337          CTimer::get("Files : open headers").suspend(); 
     338        } 
     339        //checkSplit(); // Really need for reading? 
     340      } 
     341    } 
     342 
     343    /*! 
     344      Verify if a process participates in an opening-file communicator  
     345      \return true if the process doesn't participate in opening file 
     346    */ 
     347    bool CFile::isEmptyZone() 
     348    { 
     349      return allZoneEmpty; 
    290350    } 
    291351 
     
    352412      CContextServer* server = context->server; 
    353413 
    354       if (!allDomainEmpty) 
     414      if (!allZoneEmpty) 
    355415      { 
    356416         StdString filename = getFileOutputName(); 
     
    381441         if (pos2!=std::string::npos) 
    382442         { 
    383            middlePart=filename.substr(pos1,pos2-pos1) ;            
     443           middlePart=filename.substr(pos1,pos2-pos1) ; 
    384444           pos2+=strEndDate.size() ; 
    385445           lastPart=filename.substr(pos2,filename.size()-pos2) ; 
     
    434494           oss << lastPart ; 
    435495 
    436            StdString keySuffix("CContext_"+CContext::getCurrent()->getId()+"::CFile_"+getFileOutputName()+"::") ;  
     496           StdString keySuffix("CFile::"+getFileOutputName()+"::") ;  
    437497           context->registryOut->setKey(keySuffix+"splitStart", lastSplit); 
    438498           context->registryOut->setKey(keySuffix+"splitEnd",   splitEnd); 
     
    534594  \brief Open an existing NetCDF file in read-only mode 
    535595  */ 
    536   void CFile::openInReadMode(void) 
     596  void CFile::openInReadMode() 
    537597  { 
    538598    CContext* context = CContext::getCurrent(); 
    539599    CContextServer* server = context->server; 
    540  
    541     if (!allDomainEmpty) 
     600    ep_lib::MPI_Comm readComm = this->fileComm; 
     601 
     602    if (!allZoneEmpty) 
    542603    { 
    543604      StdString filename = getFileOutputName(); 
     
    578639      { 
    579640        int commSize, commRank; 
    580         ep_lib::MPI_Comm_size(fileComm, &commSize); 
    581         ep_lib::MPI_Comm_rank(fileComm, &commRank); 
     641        ep_lib::MPI_Comm_size(readComm, &commSize); 
     642        ep_lib::MPI_Comm_rank(readComm, &commRank); 
    582643 
    583644        if (server->intraCommSize > 1) 
     
    597658      bool isCollective = par_access.isEmpty() || par_access == par_access_attr::collective; 
    598659      #ifdef _usingEP 
    599         //printf("multifile was %d\n", multifile); 
    600         //multifile = true; 
    601660        if (isOpen) data_out->closeFile(); 
    602         if (time_counter_name.isEmpty()) data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), fileComm, multifile, isCollective)); 
    603         else data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), fileComm, multifile, isCollective, time_counter_name)); 
     661        if (time_counter_name.isEmpty()) data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), readComm, multifile, isCollective)); 
     662        else data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), readComm, multifile, isCollective, time_counter_name)); 
    604663        isOpen = true; 
    605664      #elif _usingMPI 
    606665        if (isOpen) data_out->closeFile(); 
    607         if (time_counter_name.isEmpty()) data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), fileComm, multifile, isCollective)); 
    608         else data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), fileComm, multifile, isCollective, time_counter_name)); 
     666        if (time_counter_name.isEmpty()) data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), readComm, multifile, isCollective)); 
     667        else data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), readComm, multifile, isCollective, time_counter_name)); 
    609668        isOpen = true; 
    610669      #endif 
     
    615674   void CFile::close(void) 
    616675   { 
    617      if (!allDomainEmpty) 
     676     if (!allZoneEmpty) 
    618677       if (isOpen) 
    619678       { 
     
    622681         else 
    623682          this->data_in->closeFile(); 
     683        isOpen = false; 
    624684       } 
    625 //      if (fileComm != MPI_COMM_NULL) MPI_Comm_free(&fileComm); 
    626       //if (fileComm.mpi_comm != ::MPI_COMM_NULL) MPI_Comm_free(&fileComm); 
     685      //if (fileComm != MPI_COMM_NULL) MPI_Comm_free(&fileComm); 
    627686   } 
    628687   //---------------------------------------------------------------- 
     
    633692 
    634693     // Just check file and try to open it 
    635      CContext* context = CContext::getCurrent(); 
    636      CContextClient* client=context->client; 
    637  
    638      // It would probably be better to call initFile() somehow 
    639      ep_lib::MPI_Comm_dup(client->intraComm, &fileComm); 
    640694     if (time_counter_name.isEmpty()) time_counter_name = "time_counter"; 
    641695 
    642      checkFile(); 
     696     checkReadFile(); 
    643697 
    644698     for (int idx = 0; idx < enabledFields.size(); ++idx) 
     
    655709        // Read necessary value from file 
    656710        #pragma omp critical (_func) 
    657         { 
    658           this->data_in->readFieldAttributesValues(enabledFields[idx]); 
    659         } 
     711        this->data_in->readFieldAttributesValues(enabledFields[idx]); 
     712 
    660713        // Fill attributes for base reference 
    661714        enabledFields[idx]->solveGridDomainAxisBaseRef(); 
     
    737790     { 
    738791       this->enabledFields[i]->solveOnlyReferenceEnabledField(sendToServer); 
    739 //       this->enabledFields[i]->buildGridTransformationGraph(); 
     792     } 
     793   } 
     794 
     795   void CFile::checkGridOfEnabledFields() 
     796   {  
     797     int size = this->enabledFields.size(); 
     798     for (int i = 0; i < size; ++i) 
     799     { 
     800       this->enabledFields[i]->checkGridOfEnabledFields(); 
     801     } 
     802   } 
     803 
     804   void CFile::sendGridComponentOfEnabledFields() 
     805   {  
     806     int size = this->enabledFields.size(); 
     807     for (int i = 0; i < size; ++i) 
     808     { 
     809       this->enabledFields[i]->sendGridComponentOfEnabledFields(); 
     810     } 
     811   } 
     812 
     813   void CFile::sendGridOfEnabledFields() 
     814   {  
     815     int size = this->enabledFields.size(); 
     816     for (int i = 0; i < size; ++i) 
     817     { 
     818       this->enabledFields[i]->sendGridOfEnabledFields(); 
    740819     } 
    741820   } 
     
    758837   \param [in] sendToServer: Send all info to server (true) or only a part of it (false) 
    759838   */ 
    760    void CFile::solveAllRefOfEnabledFields(bool sendToServer) 
     839   void CFile::solveAllRefOfEnabledFieldsAndTransform(bool sendToServer) 
    761840   { 
    762841     int size = this->enabledFields.size(); 
    763842     for (int i = 0; i < size; ++i) 
    764843     { 
    765        this->enabledFields[i]->solveAllReferenceEnabledField(sendToServer); 
     844       this->enabledFields[i]->solveAllEnabledFieldsAndTransform(); 
    766845     } 
    767846   } 
     
    782861 
    783862   /*! 
     863    * Post-process the filter graph for each active field. 
     864    */ 
     865   void CFile::postProcessFilterGraph() 
     866   { 
     867     int size = this->enabledFields.size(); 
     868     for (int i = 0; i < size; ++i) 
     869     { 
     870       this->enabledFields[i]->checkIfMustAutoTrigger(); 
     871     } 
     872   } 
     873 
     874   /*! 
    784875     Prefetching the data for enabled fields read from file. 
    785876   */ 
     
    795886 
    796887   /*! 
     888     Do all pre timestep operations for enabled fields in read mode: 
     889      - Check that the data excepted from server has been received 
     890      - Check if some filters must auto-trigger 
     891   */ 
     892   void CFile::doPreTimestepOperationsForEnabledReadModeFields(void) 
     893   { 
     894     if (mode.isEmpty() || mode.getValue() != mode_attr::read) 
     895       return; 
     896 
     897     int size = this->enabledFields.size(); 
     898     for (int i = 0; i < size; ++i) 
     899     { 
     900       this->enabledFields[i]->checkForLateDataFromServer(); 
     901       this->enabledFields[i]->autoTriggerIfNeeded(); 
     902     } 
     903   } 
     904 
     905   /*! 
    797906     Do all post timestep operations for enabled fields in read mode: 
    798907      - Prefetch the data read from file when needed 
    799       - Check that the data excepted from server has been received 
    800908   */ 
    801909   void CFile::doPostTimestepOperationsForEnabledReadModeFields(void) 
     
    807915     for (int i = 0; i < size; ++i) 
    808916     { 
    809        this->enabledFields[i]->checkForLateDataFromServer(); 
    810917       this->enabledFields[i]->sendReadDataRequestIfNeeded(); 
    811918     } 
     
    874981   } 
    875982 
     983   void CFile::setContextClient(CContextClient* newContextClient) 
     984   { 
     985     client = newContextClient; 
     986     size_t size = this->enabledFields.size(); 
     987     for (size_t i = 0; i < size; ++i) 
     988     { 
     989       this->enabledFields[i]->setContextClient(newContextClient); 
     990     } 
     991   } 
     992 
     993   CContextClient* CFile::getContextClient() 
     994   { 
     995     return client; 
     996   } 
     997 
     998   void CFile::setReadContextClient(CContextClient* readContextclient) 
     999   { 
     1000     read_client = readContextclient; 
     1001   } 
     1002 
     1003   CContextClient* CFile::getReadContextClient() 
     1004   { 
     1005     return read_client; 
     1006   } 
     1007 
    8761008   /*! 
    8771009   \brief Send a message to create a field on server side 
    8781010   \param[in] id String identity of field that will be created on server 
    8791011   */ 
    880    void CFile::sendAddField(const string& id) 
    881    { 
    882     CContext* context = CContext::getCurrent(); 
    883  
    884     if (! context->hasServer ) 
    885     { 
    886        CContextClient* client = context->client; 
    887  
    888        CEventClient event(this->getType(),EVENT_ID_ADD_FIELD); 
    889        if (client->isServerLeader()) 
    890        { 
    891          CMessage msg; 
    892          msg << this->getId(); 
    893          msg << id; 
    894          const std::list<int>& ranks = client->getRanksServerLeader(); 
    895          for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
    896            event.push(*itRank,1,msg); 
    897          client->sendEvent(event); 
    898        } 
    899        else client->sendEvent(event); 
    900     } 
    901  
     1012   void CFile::sendAddField(const string& id, CContextClient* client) 
     1013   { 
     1014      sendAddItem(id, EVENT_ID_ADD_FIELD, client); 
    9021015   } 
    9031016 
     
    9061019   \param[in] id String identity of field group that will be created on server 
    9071020   */ 
    908    void CFile::sendAddFieldGroup(const string& id) 
    909    { 
    910     CContext* context = CContext::getCurrent(); 
    911     if (! context->hasServer ) 
    912     { 
    913        CContextClient* client = context->client; 
    914  
    915        CEventClient event(this->getType(),EVENT_ID_ADD_FIELD_GROUP); 
    916        if (client->isServerLeader()) 
    917        { 
    918          CMessage msg; 
    919          msg << this->getId(); 
    920          msg << id; 
    921          const std::list<int>& ranks = client->getRanksServerLeader(); 
    922          for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
    923            event.push(*itRank,1,msg); 
    924          client->sendEvent(event); 
    925        } 
    926        else client->sendEvent(event); 
    927     } 
    928  
     1021   void CFile::sendAddFieldGroup(const string& id, CContextClient* client) 
     1022   { 
     1023      sendAddItem(id, (int)EVENT_ID_ADD_FIELD_GROUP, client); 
    9291024   } 
    9301025 
     
    9831078   is to duplicate this value on server, too. 
    9841079   */ 
    985    void CFile::sendAddAllVariables() 
     1080   void CFile::sendAddAllVariables(CContextClient* client) 
    9861081   { 
    9871082     std::vector<CVariable*> allVar = getAllVariables(); 
     
    9911086     for (; it != itE; ++it) 
    9921087     { 
    993        this->sendAddVariable((*it)->getId()); 
    994        (*it)->sendAllAttributesToServer(); 
    995        (*it)->sendValue(); 
    996      } 
    997    } 
    998  
    999    /*! 
    1000    \brief Send a message to create a variable on server side 
    1001       A variable always belongs to a variable group 
    1002    \param[in] id String identity of variable that will be created on server 
    1003    */ 
    1004    void CFile::sendAddVariable(const string& id) 
    1005    { 
    1006     CContext* context = CContext::getCurrent(); 
    1007  
    1008     if (! context->hasServer ) 
    1009     { 
    1010        CContextClient* client = context->client; 
    1011  
    1012        CEventClient event(this->getType(),EVENT_ID_ADD_VARIABLE); 
    1013        if (client->isServerLeader()) 
    1014        { 
    1015          CMessage msg; 
    1016          msg << this->getId(); 
    1017          msg << id; 
    1018          const std::list<int>& ranks = client->getRanksServerLeader(); 
    1019          for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
    1020            event.push(*itRank,1,msg); 
    1021          client->sendEvent(event); 
    1022        } 
    1023        else client->sendEvent(event); 
    1024     } 
    1025  
     1088       this->sendAddVariable((*it)->getId(), client); 
     1089       (*it)->sendAllAttributesToServer(client); 
     1090       (*it)->sendValue(client); 
     1091     } 
    10261092   } 
    10271093 
     
    10291095   \brief Send a message to create a variable group on server side 
    10301096   \param[in] id String identity of variable group that will be created on server 
    1031    */ 
    1032    void CFile::sendAddVariableGroup(const string& id) 
    1033    { 
    1034     CContext* context = CContext::getCurrent(); 
    1035     if (! context->hasServer ) 
    1036     { 
    1037        CContextClient* client = context->client; 
    1038  
    1039        CEventClient event(this->getType(),EVENT_ID_ADD_VARIABLE_GROUP); 
    1040        if (client->isServerLeader()) 
    1041        { 
    1042          CMessage msg; 
    1043          msg << this->getId(); 
    1044          msg << id; 
    1045          const std::list<int>& ranks = client->getRanksServerLeader(); 
    1046          for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
    1047            event.push(*itRank,1,msg); 
    1048          client->sendEvent(event); 
    1049        } 
    1050        else client->sendEvent(event); 
    1051     } 
    1052  
     1097   \param [in] client client to which we will send this adding action 
     1098   */ 
     1099   void CFile::sendAddVariableGroup(const string& id, CContextClient* client) 
     1100   { 
     1101      sendAddItem(id, (int)EVENT_ID_ADD_VARIABLE_GROUP, client); 
     1102   } 
     1103 
     1104   /* 
     1105     Send message to add a variable into a file within a certain client 
     1106     \param [in] id String identity of a variable 
     1107     \param [in] client client to which we will send this adding action 
     1108   */ 
     1109   void CFile::sendAddVariable(const string& id, CContextClient* client) 
     1110   { 
     1111      sendAddItem(id, (int)EVENT_ID_ADD_VARIABLE, client); 
    10531112   } 
    10541113 
     
    11081167   Remark: This function must be called AFTER all active (enabled) files have been created on the server side 
    11091168   */ 
    1110    void CFile::sendEnabledFields() 
     1169   void CFile::sendEnabledFields(CContextClient* client) 
    11111170   { 
    11121171     size_t size = this->enabledFields.size(); 
     
    11141173     { 
    11151174       CField* field = this->enabledFields[i]; 
    1116        this->sendAddField(field->getId()); 
    1117        field->checkAttributes(); 
    1118        field->sendAllAttributesToServer(); 
    1119        field->sendAddAllVariables(); 
    1120      } 
    1121    } 
     1175       this->sendAddField(field->getId(), client); 
     1176       field->checkTimeAttributes(); 
     1177       field->sendAllAttributesToServer(client); 
     1178       field->sendAddAllVariables(client); 
     1179     } 
     1180   } 
     1181 
    11221182 
    11231183   /*! 
Note: See TracChangeset for help on using the changeset viewer.