Changeset 1460 for XIOS/dev/branch_openmp/src/node/file.cpp
- Timestamp:
- 03/22/18 10:43:20 (6 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_openmp/src/node/file.cpp
r1338 r1460 17 17 #include "mpi.hpp" 18 18 #include "timer.hpp" 19 19 #include "server.hpp" 20 20 21 21 namespace xios { … … 26 26 : CObjectTemplate<CFile>(), CFileAttributes() 27 27 , vFieldGroup(), data_out(), enabledFields() 28 , allDomainEmpty(false), isOpen(false)28 , isOpen(false), read_client(0), checkRead(false), allZoneEmpty(false) 29 29 { 30 30 setVirtualFieldGroup(CFieldGroup::create(getId() + "_virtual_field_group")); … … 35 35 : CObjectTemplate<CFile>(id), CFileAttributes() 36 36 , vFieldGroup(), data_out(), enabledFields() 37 , allDomainEmpty(false), isOpen(false)37 , isOpen(false), read_client(0), checkRead(false), allZoneEmpty(false) 38 38 { 39 39 setVirtualFieldGroup(CFieldGroup::create(getId() + "_virtual_field_group")); … … 208 208 209 209 //! Initialize a file in order to write into it 210 void CFile::init File(void)210 void CFile::initWrite(void) 211 211 { 212 212 CContext* context = CContext::getCurrent(); … … 218 218 if (!split_freq.isEmpty()) 219 219 { 220 StdString keySuffix("C Context_"+CContext::getCurrent()->getId()+"::CFile_"+getFileOutputName()+"::") ;220 StdString keySuffix("CFile::"+getFileOutputName()+"::") ; 221 221 if (context->registryIn->foundKey(keySuffix+"splitStart") && context->registryIn->foundKey(keySuffix+"splitEnd")) 222 222 { … … 229 229 } 230 230 } 231 isOpen = false; 232 233 allDomainEmpty = true; 231 isOpen = false; 234 232 235 233 // if (!record_offset.isEmpty() && record_offset < 0) … … 238 236 const int recordOffset = record_offset.isEmpty() ? 0 : record_offset; 239 237 240 // set<CAxis*> setAxis;241 // set<CDomain*> setDomains;242 238 set<StdString> setAxis; 243 239 set<StdString> setDomains; 244 240 245 241 std::vector<CField*>::iterator it, end = this->enabledFields.end(); 246 242 for (it = this->enabledFields.begin(); it != end; it++) 247 243 { 248 CField* field = *it; 249 allDomainEmpty &= !field->grid->doGridHaveDataToWrite(); 244 CField* field = *it; 250 245 std::vector<CAxis*> vecAxis = field->grid->getAxis(); 251 246 for (size_t i = 0; i < vecAxis.size(); ++i) 252 setAxis.insert(vecAxis[i]->getAxisOutputName()); 253 // setAxis.insert(vecAxis[i]); 247 setAxis.insert(vecAxis[i]->getAxisOutputName()); 254 248 std::vector<CDomain*> vecDomains = field->grid->getDomains(); 255 249 for (size_t i = 0; i < vecDomains.size(); ++i) 256 setDomains.insert(vecDomains[i]->getDomainOutputName()); 257 // setDomains.insert(vecDomains[i]); 250 setDomains.insert(vecDomains[i]->getDomainOutputName()); 258 251 259 252 field->resetNStep(recordOffset); … … 263 256 264 257 // create sub communicator for file 265 int color = allDomainEmpty ? 0 : 1; 266 ep_lib::MPI_Comm_split(server->intraComm, color, server->intraCommRank, &fileComm); 267 if (allDomainEmpty) ep_lib::MPI_Comm_free(&fileComm); 258 createSubComFile(); 268 259 269 260 // if (time_counter.isEmpty()) time_counter.setValue(time_counter_attr::centered); … … 271 262 } 272 263 273 //! Verify state of a file274 void CFile:: checkFile(void)264 //! Initialize a file in order to write into it 265 void CFile::initRead(void) 275 266 { 276 if (mode.isEmpty() || mode.getValue() == mode_attr::write) 277 { 278 CTimer::get("Files : create headers").resume(); 279 if (!isOpen) createHeader(); 280 CTimer::get("Files : create headers").suspend(); 281 checkSync(); 282 } 283 else 284 { 285 CTimer::get("Files : open headers").resume(); 286 if (!isOpen) openInReadMode(); 287 CTimer::get("Files : open headers").suspend(); 288 } 289 checkSplit(); 267 if (checkRead) return; 268 createSubComFile(); 269 checkRead = true; 270 } 271 272 /*! 273 Create a sub communicator in which processes participate in reading/opening file 274 */ 275 void CFile::createSubComFile() 276 { 277 CContext* context = CContext::getCurrent(); 278 CContextServer* server = context->server; 279 280 // create sub communicator for file 281 allZoneEmpty = true; 282 std::vector<CField*>::iterator it, end = this->enabledFields.end(); 283 for (it = this->enabledFields.begin(); it != end; it++) 284 { 285 CField* field = *it; 286 bool nullGrid = (0 == field->grid); 287 allZoneEmpty &= nullGrid ? false : !field->grid->doGridHaveDataToWrite(); 288 } 289 290 int color = allZoneEmpty ? 0 : 1; 291 ep_lib::MPI_Comm_split(server->intraComm, color, server->intraCommRank, &fileComm); 292 if (allZoneEmpty) ep_lib::MPI_Comm_free(&fileComm); 293 } 294 295 /* 296 Check condition to write into a file 297 For now, we only use the level-2 server to write files (if this mode is activated) 298 or classical server to do this job. 299 */ 300 void CFile::checkWriteFile(void) 301 { 302 CContext* context = CContext::getCurrent(); 303 // Done by classical server or secondary server 304 // This condition should be changed soon 305 if (CServer::serverLevel == 0 || CServer::serverLevel == 2) 306 { 307 if (mode.isEmpty() || mode.getValue() == mode_attr::write) 308 { 309 CTimer::get("Files : create headers").resume(); 310 if (!isOpen) createHeader(); 311 CTimer::get("Files : create headers").suspend(); 312 checkSync(); 313 } 314 checkSplit(); // REally need this? 315 } 316 } 317 318 /* 319 Check condition to read from a file 320 For now, we only use the level-1 server to write files (if this mode is activated) 321 or classical server to do this job. 322 This function can be used by client for reading metadata 323 */ 324 void CFile::checkReadFile(void) 325 { 326 CContext* context = CContext::getCurrent(); 327 // Done by classical server or secondary server 328 // TODO: This condition should be changed soon. It only works with maximum number of level as 2 329 if (CServer::serverLevel == 0 || CServer::serverLevel == 1) 330 { 331 if (!mode.isEmpty() && mode.getValue() == mode_attr::read) 332 { 333 CTimer::get("Files : open headers").resume(); 334 335 if (!isOpen) openInReadMode(); 336 337 CTimer::get("Files : open headers").suspend(); 338 } 339 //checkSplit(); // Really need for reading? 340 } 341 } 342 343 /*! 344 Verify if a process participates in an opening-file communicator 345 \return true if the process doesn't participate in opening file 346 */ 347 bool CFile::isEmptyZone() 348 { 349 return allZoneEmpty; 290 350 } 291 351 … … 352 412 CContextServer* server = context->server; 353 413 354 if (!all DomainEmpty)414 if (!allZoneEmpty) 355 415 { 356 416 StdString filename = getFileOutputName(); … … 381 441 if (pos2!=std::string::npos) 382 442 { 383 middlePart=filename.substr(pos1,pos2-pos1) ; 443 middlePart=filename.substr(pos1,pos2-pos1) ; 384 444 pos2+=strEndDate.size() ; 385 445 lastPart=filename.substr(pos2,filename.size()-pos2) ; … … 434 494 oss << lastPart ; 435 495 436 StdString keySuffix("C Context_"+CContext::getCurrent()->getId()+"::CFile_"+getFileOutputName()+"::") ;496 StdString keySuffix("CFile::"+getFileOutputName()+"::") ; 437 497 context->registryOut->setKey(keySuffix+"splitStart", lastSplit); 438 498 context->registryOut->setKey(keySuffix+"splitEnd", splitEnd); … … 534 594 \brief Open an existing NetCDF file in read-only mode 535 595 */ 536 void CFile::openInReadMode( void)596 void CFile::openInReadMode() 537 597 { 538 598 CContext* context = CContext::getCurrent(); 539 599 CContextServer* server = context->server; 540 541 if (!allDomainEmpty) 600 ep_lib::MPI_Comm readComm = this->fileComm; 601 602 if (!allZoneEmpty) 542 603 { 543 604 StdString filename = getFileOutputName(); … … 578 639 { 579 640 int commSize, commRank; 580 ep_lib::MPI_Comm_size( fileComm, &commSize);581 ep_lib::MPI_Comm_rank( fileComm, &commRank);641 ep_lib::MPI_Comm_size(readComm, &commSize); 642 ep_lib::MPI_Comm_rank(readComm, &commRank); 582 643 583 644 if (server->intraCommSize > 1) … … 597 658 bool isCollective = par_access.isEmpty() || par_access == par_access_attr::collective; 598 659 #ifdef _usingEP 599 //printf("multifile was %d\n", multifile);600 //multifile = true;601 660 if (isOpen) data_out->closeFile(); 602 if (time_counter_name.isEmpty()) data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), fileComm, multifile, isCollective));603 else data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), fileComm, multifile, isCollective, time_counter_name));661 if (time_counter_name.isEmpty()) data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), readComm, multifile, isCollective)); 662 else data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), readComm, multifile, isCollective, time_counter_name)); 604 663 isOpen = true; 605 664 #elif _usingMPI 606 665 if (isOpen) data_out->closeFile(); 607 if (time_counter_name.isEmpty()) data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), fileComm, multifile, isCollective));608 else data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), fileComm, multifile, isCollective, time_counter_name));666 if (time_counter_name.isEmpty()) data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), readComm, multifile, isCollective)); 667 else data_in = boost::shared_ptr<CDataInput>(new CNc4DataInput(oss.str(), readComm, multifile, isCollective, time_counter_name)); 609 668 isOpen = true; 610 669 #endif … … 615 674 void CFile::close(void) 616 675 { 617 if (!all DomainEmpty)676 if (!allZoneEmpty) 618 677 if (isOpen) 619 678 { … … 622 681 else 623 682 this->data_in->closeFile(); 683 isOpen = false; 624 684 } 625 // if (fileComm != MPI_COMM_NULL) MPI_Comm_free(&fileComm); 626 //if (fileComm.mpi_comm != ::MPI_COMM_NULL) MPI_Comm_free(&fileComm); 685 //if (fileComm != MPI_COMM_NULL) MPI_Comm_free(&fileComm); 627 686 } 628 687 //---------------------------------------------------------------- … … 633 692 634 693 // Just check file and try to open it 635 CContext* context = CContext::getCurrent();636 CContextClient* client=context->client;637 638 // It would probably be better to call initFile() somehow639 ep_lib::MPI_Comm_dup(client->intraComm, &fileComm);640 694 if (time_counter_name.isEmpty()) time_counter_name = "time_counter"; 641 695 642 check File();696 checkReadFile(); 643 697 644 698 for (int idx = 0; idx < enabledFields.size(); ++idx) … … 655 709 // Read necessary value from file 656 710 #pragma omp critical (_func) 657 { 658 this->data_in->readFieldAttributesValues(enabledFields[idx]); 659 } 711 this->data_in->readFieldAttributesValues(enabledFields[idx]); 712 660 713 // Fill attributes for base reference 661 714 enabledFields[idx]->solveGridDomainAxisBaseRef(); … … 737 790 { 738 791 this->enabledFields[i]->solveOnlyReferenceEnabledField(sendToServer); 739 // this->enabledFields[i]->buildGridTransformationGraph(); 792 } 793 } 794 795 void CFile::checkGridOfEnabledFields() 796 { 797 int size = this->enabledFields.size(); 798 for (int i = 0; i < size; ++i) 799 { 800 this->enabledFields[i]->checkGridOfEnabledFields(); 801 } 802 } 803 804 void CFile::sendGridComponentOfEnabledFields() 805 { 806 int size = this->enabledFields.size(); 807 for (int i = 0; i < size; ++i) 808 { 809 this->enabledFields[i]->sendGridComponentOfEnabledFields(); 810 } 811 } 812 813 void CFile::sendGridOfEnabledFields() 814 { 815 int size = this->enabledFields.size(); 816 for (int i = 0; i < size; ++i) 817 { 818 this->enabledFields[i]->sendGridOfEnabledFields(); 740 819 } 741 820 } … … 758 837 \param [in] sendToServer: Send all info to server (true) or only a part of it (false) 759 838 */ 760 void CFile::solveAllRefOfEnabledFields (bool sendToServer)839 void CFile::solveAllRefOfEnabledFieldsAndTransform(bool sendToServer) 761 840 { 762 841 int size = this->enabledFields.size(); 763 842 for (int i = 0; i < size; ++i) 764 843 { 765 this->enabledFields[i]->solveAll ReferenceEnabledField(sendToServer);844 this->enabledFields[i]->solveAllEnabledFieldsAndTransform(); 766 845 } 767 846 } … … 782 861 783 862 /*! 863 * Post-process the filter graph for each active field. 864 */ 865 void CFile::postProcessFilterGraph() 866 { 867 int size = this->enabledFields.size(); 868 for (int i = 0; i < size; ++i) 869 { 870 this->enabledFields[i]->checkIfMustAutoTrigger(); 871 } 872 } 873 874 /*! 784 875 Prefetching the data for enabled fields read from file. 785 876 */ … … 795 886 796 887 /*! 888 Do all pre timestep operations for enabled fields in read mode: 889 - Check that the data excepted from server has been received 890 - Check if some filters must auto-trigger 891 */ 892 void CFile::doPreTimestepOperationsForEnabledReadModeFields(void) 893 { 894 if (mode.isEmpty() || mode.getValue() != mode_attr::read) 895 return; 896 897 int size = this->enabledFields.size(); 898 for (int i = 0; i < size; ++i) 899 { 900 this->enabledFields[i]->checkForLateDataFromServer(); 901 this->enabledFields[i]->autoTriggerIfNeeded(); 902 } 903 } 904 905 /*! 797 906 Do all post timestep operations for enabled fields in read mode: 798 907 - Prefetch the data read from file when needed 799 - Check that the data excepted from server has been received800 908 */ 801 909 void CFile::doPostTimestepOperationsForEnabledReadModeFields(void) … … 807 915 for (int i = 0; i < size; ++i) 808 916 { 809 this->enabledFields[i]->checkForLateDataFromServer();810 917 this->enabledFields[i]->sendReadDataRequestIfNeeded(); 811 918 } … … 874 981 } 875 982 983 void CFile::setContextClient(CContextClient* newContextClient) 984 { 985 client = newContextClient; 986 size_t size = this->enabledFields.size(); 987 for (size_t i = 0; i < size; ++i) 988 { 989 this->enabledFields[i]->setContextClient(newContextClient); 990 } 991 } 992 993 CContextClient* CFile::getContextClient() 994 { 995 return client; 996 } 997 998 void CFile::setReadContextClient(CContextClient* readContextclient) 999 { 1000 read_client = readContextclient; 1001 } 1002 1003 CContextClient* CFile::getReadContextClient() 1004 { 1005 return read_client; 1006 } 1007 876 1008 /*! 877 1009 \brief Send a message to create a field on server side 878 1010 \param[in] id String identity of field that will be created on server 879 1011 */ 880 void CFile::sendAddField(const string& id) 881 { 882 CContext* context = CContext::getCurrent(); 883 884 if (! context->hasServer ) 885 { 886 CContextClient* client = context->client; 887 888 CEventClient event(this->getType(),EVENT_ID_ADD_FIELD); 889 if (client->isServerLeader()) 890 { 891 CMessage msg; 892 msg << this->getId(); 893 msg << id; 894 const std::list<int>& ranks = client->getRanksServerLeader(); 895 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 896 event.push(*itRank,1,msg); 897 client->sendEvent(event); 898 } 899 else client->sendEvent(event); 900 } 901 1012 void CFile::sendAddField(const string& id, CContextClient* client) 1013 { 1014 sendAddItem(id, EVENT_ID_ADD_FIELD, client); 902 1015 } 903 1016 … … 906 1019 \param[in] id String identity of field group that will be created on server 907 1020 */ 908 void CFile::sendAddFieldGroup(const string& id) 909 { 910 CContext* context = CContext::getCurrent(); 911 if (! context->hasServer ) 912 { 913 CContextClient* client = context->client; 914 915 CEventClient event(this->getType(),EVENT_ID_ADD_FIELD_GROUP); 916 if (client->isServerLeader()) 917 { 918 CMessage msg; 919 msg << this->getId(); 920 msg << id; 921 const std::list<int>& ranks = client->getRanksServerLeader(); 922 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 923 event.push(*itRank,1,msg); 924 client->sendEvent(event); 925 } 926 else client->sendEvent(event); 927 } 928 1021 void CFile::sendAddFieldGroup(const string& id, CContextClient* client) 1022 { 1023 sendAddItem(id, (int)EVENT_ID_ADD_FIELD_GROUP, client); 929 1024 } 930 1025 … … 983 1078 is to duplicate this value on server, too. 984 1079 */ 985 void CFile::sendAddAllVariables( )1080 void CFile::sendAddAllVariables(CContextClient* client) 986 1081 { 987 1082 std::vector<CVariable*> allVar = getAllVariables(); … … 991 1086 for (; it != itE; ++it) 992 1087 { 993 this->sendAddVariable((*it)->getId()); 994 (*it)->sendAllAttributesToServer(); 995 (*it)->sendValue(); 996 } 997 } 998 999 /*! 1000 \brief Send a message to create a variable on server side 1001 A variable always belongs to a variable group 1002 \param[in] id String identity of variable that will be created on server 1003 */ 1004 void CFile::sendAddVariable(const string& id) 1005 { 1006 CContext* context = CContext::getCurrent(); 1007 1008 if (! context->hasServer ) 1009 { 1010 CContextClient* client = context->client; 1011 1012 CEventClient event(this->getType(),EVENT_ID_ADD_VARIABLE); 1013 if (client->isServerLeader()) 1014 { 1015 CMessage msg; 1016 msg << this->getId(); 1017 msg << id; 1018 const std::list<int>& ranks = client->getRanksServerLeader(); 1019 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1020 event.push(*itRank,1,msg); 1021 client->sendEvent(event); 1022 } 1023 else client->sendEvent(event); 1024 } 1025 1088 this->sendAddVariable((*it)->getId(), client); 1089 (*it)->sendAllAttributesToServer(client); 1090 (*it)->sendValue(client); 1091 } 1026 1092 } 1027 1093 … … 1029 1095 \brief Send a message to create a variable group on server side 1030 1096 \param[in] id String identity of variable group that will be created on server 1031 */ 1032 void CFile::sendAddVariableGroup(const string& id) 1033 { 1034 CContext* context = CContext::getCurrent(); 1035 if (! context->hasServer ) 1036 { 1037 CContextClient* client = context->client; 1038 1039 CEventClient event(this->getType(),EVENT_ID_ADD_VARIABLE_GROUP); 1040 if (client->isServerLeader()) 1041 { 1042 CMessage msg; 1043 msg << this->getId(); 1044 msg << id; 1045 const std::list<int>& ranks = client->getRanksServerLeader(); 1046 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1047 event.push(*itRank,1,msg); 1048 client->sendEvent(event); 1049 } 1050 else client->sendEvent(event); 1051 } 1052 1097 \param [in] client client to which we will send this adding action 1098 */ 1099 void CFile::sendAddVariableGroup(const string& id, CContextClient* client) 1100 { 1101 sendAddItem(id, (int)EVENT_ID_ADD_VARIABLE_GROUP, client); 1102 } 1103 1104 /* 1105 Send message to add a variable into a file within a certain client 1106 \param [in] id String identity of a variable 1107 \param [in] client client to which we will send this adding action 1108 */ 1109 void CFile::sendAddVariable(const string& id, CContextClient* client) 1110 { 1111 sendAddItem(id, (int)EVENT_ID_ADD_VARIABLE, client); 1053 1112 } 1054 1113 … … 1108 1167 Remark: This function must be called AFTER all active (enabled) files have been created on the server side 1109 1168 */ 1110 void CFile::sendEnabledFields( )1169 void CFile::sendEnabledFields(CContextClient* client) 1111 1170 { 1112 1171 size_t size = this->enabledFields.size(); … … 1114 1173 { 1115 1174 CField* field = this->enabledFields[i]; 1116 this->sendAddField(field->getId()); 1117 field->checkAttributes(); 1118 field->sendAllAttributesToServer(); 1119 field->sendAddAllVariables(); 1120 } 1121 } 1175 this->sendAddField(field->getId(), client); 1176 field->checkTimeAttributes(); 1177 field->sendAllAttributesToServer(client); 1178 field->sendAddAllVariables(client); 1179 } 1180 } 1181 1122 1182 1123 1183 /*!
Note: See TracChangeset
for help on using the changeset viewer.