Changeset 1875 for XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp
- Timestamp:
- 05/12/20 11:52:13 (4 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp
r1872 r1875 708 708 } 709 709 710 for (auto it=couplerClient_.begin(); it!=couplerClient_.end(); ++it) 711 { 712 if (!finalized) it->second->checkBuffers(); 713 } 714 715 for (auto it=couplerServer_.begin(); it!=couplerServer_.end(); ++it) 716 { 717 if (!finalized) it->second->eventLoop(enableEventsProcessing); 718 } 719 710 for (auto couplerOut : couplerOutClient_) 711 if (!finalized) couplerOut.second->checkBuffers(); 712 713 for (auto couplerIn : couplerInClient_) 714 if (!finalized) couplerIn.second->checkBuffers(); 715 716 for (auto couplerOut : couplerOutServer_) 717 if (!finalized) couplerOut.second->eventLoop(enableEventsProcessing); 718 719 for (auto couplerIn : couplerInServer_) 720 if (!finalized) couplerIn.second->eventLoop(enableEventsProcessing); 721 720 722 if (server!=nullptr) if (!finalized) finished &= server->eventLoop(enableEventsProcessing); 721 723 … … 723 725 } 724 726 725 void CContext::addCouplingChanel(const std::string& context, bool out)727 void CContext::addCouplingChanel(const std::string& fullContextId, bool out) 726 728 { 727 vector<string> vectStr=splitRegex(context,"::") ;728 string poolId=vectStr[0] ;729 string serviceId=poolId ;730 string contextId=vectStr[1] ;731 732 729 int contextLeader ; 733 int type = CServicesManager::CLIENT ;734 string contextName=CXios::getContextsManager()->getServerContextName(poolId, serviceId, 0, type, contextId) ;735 730 736 if (couplerClient_.find(contextName)==couplerClient_.end()) 737 { 738 bool ok=CXios::getContextsManager()->getContextLeader(contextName, contextLeader, getIntraComm()) ; 731 if (out) 732 { 733 if (couplerOutClient_.find(fullContextId)==couplerOutClient_.end()) 734 { 735 bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 739 736 740 MPI_Comm interComm, interCommClient, interCommServer ; 741 MPI_Comm intraCommClient, intraCommServer ; 742 743 if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 744 745 MPI_Comm_dup(intraComm_, &intraCommClient) ; 746 MPI_Comm_dup(intraComm_, &intraCommServer) ; 747 if (out) 748 { 749 MPI_Comm_dup(interComm, &interCommClient) ; 750 MPI_Comm_dup(interComm, &interCommServer) ; 751 CContextClient* client = new CContextClient(this, intraCommClient, interCommClient); 752 CContextServer* server = new CContextServer(this, intraCommServer, interCommServer); 753 client->setAssociatedServer(server) ; 754 server->setAssociatedClient(client) ; 755 } 756 else 757 { 758 MPI_Comm_dup(interComm, &interCommServer) ; 759 MPI_Comm_dup(interComm, &interCommClient) ; 760 CContextServer* server = new CContextServer(this, intraCommServer, interCommServer); 761 CContextClient* client = new CContextClient(this, intraCommClient, interCommClient); 762 client->setAssociatedServer(server) ; 763 server->setAssociatedClient(client) ; 764 } 765 MPI_Comm_free(&interComm) ; 766 767 737 MPI_Comm interComm, interCommClient, interCommServer ; 738 MPI_Comm intraCommClient, intraCommServer ; 739 740 if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 741 742 MPI_Comm_dup(intraComm_, &intraCommClient) ; 743 MPI_Comm_dup(intraComm_, &intraCommServer) ; 744 MPI_Comm_dup(interComm, &interCommClient) ; 745 MPI_Comm_dup(interComm, &interCommServer) ; 746 CContextClient* client = new CContextClient(this, intraCommClient, interCommClient); 747 CContextServer* server = new CContextServer(this, intraCommServer, interCommServer); 748 client->setAssociatedServer(server) ; 749 server->setAssociatedClient(client) ; 750 MPI_Comm_free(&interComm) ; 751 couplerOutClient_[fullContextId] = client ; 752 couplerOutServer_[fullContextId] = server ; 753 754 /* 768 755 // for now, we don't now which beffer size must be used for client coupler 769 756 // It will be evaluated later. Fix a constant size for now... … … 777 764 778 765 client->setBufferSize(bufferSize, maxEventSize); 779 780 couplerClient_[contextName] = client ; 781 couplerServer_[contextName] = server ; 782 } 766 */ 767 } 768 } 769 else if (couplerInClient_.find(fullContextId)==couplerInClient_.end()) 770 { 771 bool ok=CXios::getContextsManager()->getContextLeader(fullContextId, contextLeader, getIntraComm()) ; 772 773 MPI_Comm interComm, interCommClient, interCommServer ; 774 MPI_Comm intraCommClient, intraCommServer ; 775 776 if (ok) MPI_Intercomm_create(getIntraComm(), 0, CXios::getXiosComm(), contextLeader, 0, &interComm) ; 777 778 MPI_Comm_dup(intraComm_, &intraCommClient) ; 779 MPI_Comm_dup(intraComm_, &intraCommServer) ; 780 MPI_Comm_dup(interComm, &interCommServer) ; 781 MPI_Comm_dup(interComm, &interCommClient) ; 782 CContextServer* server = new CContextServer(this, intraCommServer, interCommServer); 783 CContextClient* client = new CContextClient(this, intraCommClient, interCommClient); 784 client->setAssociatedServer(server) ; 785 server->setAssociatedClient(client) ; 786 MPI_Comm_free(&interComm) ; 787 788 map<int,size_t> bufferSize, maxEventSize ; 789 for(int i=0;i<client->getRemoteSize();i++) 790 { 791 bufferSize[i]=10000000 ; 792 maxEventSize[i]=10000000 ; 793 } 794 795 client->setBufferSize(bufferSize, maxEventSize); 796 couplerInClient_[fullContextId] = client ; 797 couplerInServer_[fullContextId] = server ; 798 } 783 799 } 784 800 … … 798 814 if (serviceType_==CServicesManager::CLIENT) 799 815 { 800 doPreTimestepOperationsForEnabledReadModeFiles(); // For now we only use server level 1 to read data 816 //ym doPreTimestepOperationsForEnabledReadModeFiles(); // For now we only use server level 1 to read data 817 818 triggerLateFields() ; 819 820 // inform couplerIn that I am finished 821 for(auto& couplerInClient : couplerInClient_) sendCouplerInContextFinalized(couplerInClient.second) ; 822 823 // wait until received message from couplerOut that they have finished 824 bool couplersInFinalized ; 825 do 826 { 827 couplersInFinalized=true ; 828 for(auto& couplerOutClient : couplerOutClient_) couplersInFinalized &= isCouplerInContextFinalized(couplerOutClient.second) ; 829 globalEventLoop() ; 830 } while (!couplersInFinalized) ; 801 831 802 832 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; … … 1055 1085 vector<CField*>&& fileOutField = findAllEnabledFieldsInFileOut(this->enabledWriteModeFiles); 1056 1086 vector<CField*>&& fileInField = findAllEnabledFieldsInFileIn(this->enabledReadModeFiles); 1057 vector<CField*>&& CouplerOutField = findAllEnabledFieldsCouplerOut(this->enabledCouplerOut);1058 vector<CField*>&& CouplerInField = findAllEnabledFieldsCouplerIn(this->enabledCouplerIn);1087 vector<CField*>&& couplerOutField = findAllEnabledFieldsCouplerOut(this->enabledCouplerOut); 1088 vector<CField*>&& couplerInField = findAllEnabledFieldsCouplerIn(this->enabledCouplerIn); 1059 1089 findFieldsWithReadAccess(); 1060 1090 vector<CField*>& fieldWithReadAccess = fieldsWithReadAccess_ ; … … 1073 1103 } 1074 1104 1075 1076 1105 1106 for (auto& field : couplerInField) 1107 { 1108 field->unsetGridCompleted() ; 1109 } 1077 1110 // find all field potentially at workflow end 1078 1111 vector<CField*> endWorkflowFields ; 1079 endWorkflowFields.reserve(fileOutField.size()+ CouplerOutField.size()+fieldWithReadAccess.size()) ;1112 endWorkflowFields.reserve(fileOutField.size()+couplerOutField.size()+fieldWithReadAccess.size()) ; 1080 1113 endWorkflowFields.insert(endWorkflowFields.end(),fileOutField.begin(), fileOutField.end()) ; 1081 endWorkflowFields.insert(endWorkflowFields.end(), CouplerOutField.begin(), CouplerOutField.end()) ;1114 endWorkflowFields.insert(endWorkflowFields.end(),couplerOutField.begin(), couplerOutField.end()) ; 1082 1115 endWorkflowFields.insert(endWorkflowFields.end(),fieldWithReadAccess.begin(), fieldWithReadAccess.end()) ; 1083 1116 1084 for(auto endWorkflowField : endWorkflowFields) endWorkflowField->buildWorkflowGraph(garbageCollector) ; 1117 bool workflowGraphIsCompleted ; 1118 1119 bool first=true ; 1120 do 1121 { 1122 workflowGraphIsCompleted=true; 1123 for(auto endWorkflowField : endWorkflowFields) 1124 { 1125 workflowGraphIsCompleted &= endWorkflowField->buildWorkflowGraph(garbageCollector) ; 1126 } 1127 1128 for(auto couplerIn : enabledCouplerIn) couplerIn->assignContext() ; 1129 for(auto field : couplerInField) field->makeGridAliasForCoupling(); 1130 for(auto field : couplerInField) this->sendCouplerInReady(field->getContextClient()) ; 1085 1131 1132 1133 // assign context to coupler out and related fields 1134 for(auto couplerOut : enabledCouplerOut) couplerOut->assignContext() ; 1135 // for now supose that all coupling out endpoint are succesfull. The difficultie is client/server buffer evaluation 1136 for(auto field : couplerOutField) 1137 { 1138 field->computeGridIndexToFileServer() ; // same kind of index than for file server -> in future distribution may change 1139 } 1140 if (first) setClientServerBuffer(couplerOutField, true) ; // set buffer context 1141 1142 bool couplersReady ; 1143 do 1144 { 1145 couplersReady=true ; 1146 for(auto field : couplerOutField) 1147 { 1148 bool ready = isCouplerInReady(field->getContextClient()) ; 1149 if (ready) field->sendFieldToCouplerOut() ; 1150 couplersReady &= ready ; 1151 } 1152 if (!couplersReady) this->eventLoop() ; 1153 } while (!couplersReady) ; 1154 1155 first=false ; 1156 this->eventLoop() ; 1157 } while (!workflowGraphIsCompleted) ; 1158 1159 for( auto field : couplerInField) couplerInFields_.push_back(field) ; 1160 1086 1161 // get all field coming potentially from model 1087 1162 for (auto field : CField::getAll() ) if (field->getModelIn()) fieldModelIn.push_back(field) ; … … 1101 1176 field->computeGridIndexToFileServer() ; // compute grid index for transfer to the server context 1102 1177 } 1103 setClientServerBuffer(fileOutField, true) ; // set context1178 setClientServerBuffer(fileOutField, true) ; // set buffer context 1104 1179 for(auto field : fileOutField) field->sendFieldToFileServer() ; 1105 1180 } … … 1146 1221 } 1147 1222 1223 1224 for(auto field : couplerInField) 1225 { 1226 field->connectToCouplerIn(garbageCollector) ; // connect the field to server filter 1227 } 1228 1229 1230 for(auto field : couplerOutField) 1231 { 1232 field->connectToCouplerOut(garbageCollector) ; // for now the same kind of filter that for file server 1233 } 1234 1148 1235 // workflow startpoint => data from server on client side 1149 1236 if (serviceType_==CServicesManager::CLIENT) … … 1151 1238 for(auto field : fileInField) 1152 1239 { 1153 field->connectToServerInput(garbageCollector) ; // connect t he field to server filter1240 field->connectToServerInput(garbageCollector) ; // connect tFhe field to server filter 1154 1241 field->computeGridIndexToFileServer() ; // compute grid index for transfer to the server context 1155 1242 field->sendFieldToInputFileServer() ; … … 1162 1249 // no filter for reading data from file => to be implemented 1163 1250 } 1164 1165 1166 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) this->sendCloseDefinition(); 1251 1252 // construct slave server list 1253 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) 1254 { 1255 for(auto field : fileOutField) slaveServers_.insert(field->getContextClient()) ; 1256 for(auto field : fileInField) slaveServers_.insert(field->getContextClient()) ; 1257 } 1258 1259 for(auto& slaveServer : slaveServers_) sendCloseDefinition(slaveServer) ; 1260 1167 1261 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 1168 1262 { … … 1172 1266 if (serviceType_==CServicesManager::CLIENT) startPrefetchingOfEnabledReadModeFiles(); 1173 1267 1174 1175 1268 // send signal to couplerIn context that definition phasis is done 1269 1270 for(auto& couplerInClient : couplerInClient_) sendCouplerInCloseDefinition(couplerInClient.second) ; 1271 1272 // wait until all couplerIn signal that closeDefition is done. 1273 bool ok; 1274 do 1275 { 1276 ok = true ; 1277 for(auto& couplerOutClient : couplerOutClient_) ok &= isCouplerInCloseDefinition(couplerOutClient.second) ; 1278 this->eventLoop() ; 1279 } while (!ok) ; 1176 1280 1177 1281 return ; 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1178 1292 // For now, only read files with client and only one level server 1179 1293 // if (hasClient && !hasServer) findEnabledReadModeFiles(); … … 1987 2101 recvPostProcessing(event); 1988 2102 return true; 1989 2103 case EVENT_ID_SEND_REGISTRY: 1990 2104 recvRegistry(event); 1991 2105 return true; 1992 2106 break; 1993 2107 case EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES: 1994 2108 recvPostProcessingGlobalAttributes(event); 1995 2109 return true; 1996 2110 break; 1997 2111 case EVENT_ID_PROCESS_GRID_ENABLED_FIELDS: 1998 2112 recvProcessingGridOfEnabledFields(event); 1999 2113 return true; 2000 2114 break; 2115 case EVENT_ID_COUPLER_IN_READY: 2116 recvCouplerInReady(event); 2117 return true; 2118 break; 2119 case EVENT_ID_COUPLER_IN_CLOSE_DEFINITION: 2120 recvCouplerInCloseDefinition(event); 2121 return true; 2122 break; 2123 case EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED: 2124 recvCouplerInContextFinalized(event); 2125 return true; 2126 break; 2001 2127 default : 2002 2128 ERROR("bool CContext::dispatchEvent(CEventServer& event)", … … 2009 2135 2010 2136 //! Client side: Send a message to server to make it close 2137 // ym obsolete 2011 2138 void CContext::sendCloseDefinition(void) 2012 2139 TRY … … 2035 2162 } 2036 2163 CATCH_DUMP_ATTR 2164 2165 // ! Client side: Send a message to server to make it close 2166 void CContext::sendCloseDefinition(CContextClient* client) 2167 TRY 2168 { 2169 if (sendCloseDefinition_done_.count(client)!=0) return ; 2170 else sendCloseDefinition_done_.insert(client) ; 2171 2172 CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION); 2173 if (client->isServerLeader()) 2174 { 2175 CMessage msg; 2176 for(auto rank : client->getRanksServerLeader()) event.push(rank,1,msg); 2177 client->sendEvent(event); 2178 } 2179 else client->sendEvent(event); 2180 } 2181 CATCH_DUMP_ATTR 2037 2182 2038 2183 //! Server side: Receive a message of client announcing a context close … … 2049 2194 TRY 2050 2195 { 2051 int nbSrvPools ; 2052 if (serviceType_==CServicesManager::CLIENT) nbSrvPools = 1 ; 2053 else if (serviceType_==CServicesManager::GATHERER) nbSrvPools = this->clientPrimServer.size() ; 2054 else nbSrvPools = 0 ; 2055 CContextClient* contextClientTmp ; 2056 2057 for (int i = 0; i < nbSrvPools; ++i) 2058 { 2059 if (serviceType_==CServicesManager::CLIENT) contextClientTmp = client ; 2060 else if (serviceType_==CServicesManager::GATHERER ) contextClientTmp = clientPrimServer[i] ; 2061 CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR); 2062 2063 if (contextClientTmp->isServerLeader()) 2064 { 2065 CMessage msg; 2066 msg<<step; 2067 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 2068 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 2069 event.push(*itRank,1,msg); 2070 contextClientTmp->sendEvent(event); 2071 } 2072 else contextClientTmp->sendEvent(event); 2196 CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR); 2197 for(auto client : slaveServers_) 2198 { 2199 if (client->isServerLeader()) 2200 { 2201 CMessage msg; 2202 msg<<step; 2203 for (auto& rank : client->getRanksServerLeader() ) event.push(rank,1,msg); 2204 client->sendEvent(event); 2205 } 2206 else client->sendEvent(event); 2073 2207 } 2074 2208 } … … 2580 2714 CATCH_DUMP_ATTR 2581 2715 2716 void CContext::triggerLateFields(void) 2717 TRY 2718 { 2719 for(auto& field : fileInFields_) field->triggerLateField() ; 2720 for(auto& field : couplerInFields_) field->triggerLateField() ; 2721 } 2722 CATCH_DUMP_ATTR 2723 2582 2724 //! Update calendar in each time step 2583 2725 void CContext::updateCalendar(int step) … … 2590 2732 if (serviceType_==CServicesManager::CLIENT) // For now we only use server level 1 to read data 2591 2733 { 2592 doPreTimestepOperationsForEnabledReadModeFiles();2734 triggerLateFields(); 2593 2735 } 2594 2736 … … 2771 2913 } 2772 2914 CATCH_DUMP_ATTR 2915 2916 2917 2918 2919 //! Client side: Send a message announcing that context can receive grid definition from coupling 2920 void CContext::sendCouplerInReady(CContextClient* client) 2921 TRY 2922 { 2923 if (sendCouplerInReady_done_.count(client)!=0) return ; 2924 else sendCouplerInReady_done_.insert(client) ; 2925 2926 CEventClient event(getType(),EVENT_ID_COUPLER_IN_READY); 2927 2928 if (client->isServerLeader()) 2929 { 2930 CMessage msg; 2931 msg<<this->getId(); 2932 for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg); 2933 client->sendEvent(event); 2934 } 2935 else client->sendEvent(event); 2936 } 2937 CATCH_DUMP_ATTR 2938 2939 //! Server side: Receive a message announcing that context can send grid definition for context coupling 2940 void CContext::recvCouplerInReady(CEventServer& event) 2941 TRY 2942 { 2943 CBufferIn* buffer=event.subEvents.begin()->buffer; 2944 getCurrent()->recvCouplerInReady(*buffer); 2945 } 2946 CATCH 2947 2948 //! Server side: Receive a message announcing that context can send grid definition for context coupling 2949 void CContext::recvCouplerInReady(CBufferIn& buffer) 2950 TRY 2951 { 2952 string contextId ; 2953 buffer>>contextId; 2954 couplerInReady_.insert(getCouplerOutClient(contextId)) ; 2955 } 2956 CATCH_DUMP_ATTR 2957 2958 2959 2960 2961 2962 //! Client side: Send a message announcing that a coupling context have done it closeDefinition, so data can be sent now. 2963 void CContext::sendCouplerInCloseDefinition(CContextClient* client) 2964 TRY 2965 { 2966 if (sendCouplerInCloseDefinition_done_.count(client)!=0) return ; 2967 else sendCouplerInCloseDefinition_done_.insert(client) ; 2968 2969 CEventClient event(getType(),EVENT_ID_COUPLER_IN_CLOSE_DEFINITION); 2970 2971 if (client->isServerLeader()) 2972 { 2973 CMessage msg; 2974 msg<<this->getId(); 2975 for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg); 2976 client->sendEvent(event); 2977 } 2978 else client->sendEvent(event); 2979 } 2980 CATCH_DUMP_ATTR 2981 2982 //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now. 2983 void CContext::recvCouplerInCloseDefinition(CEventServer& event) 2984 TRY 2985 { 2986 CBufferIn* buffer=event.subEvents.begin()->buffer; 2987 getCurrent()->recvCouplerInCloseDefinition(*buffer); 2988 } 2989 CATCH 2990 2991 //! Server side: Receive a message announcing that a coupling context have done it closeDefinition, so data can be sent now. 2992 void CContext::recvCouplerInCloseDefinition(CBufferIn& buffer) 2993 TRY 2994 { 2995 string contextId ; 2996 buffer>>contextId; 2997 couplerInCloseDefinition_.insert(getCouplerOutClient(contextId)) ; 2998 } 2999 CATCH_DUMP_ATTR 3000 3001 3002 3003 3004 //! Client side: Send a message announcing that a coupling context have done it contextFinalize, so it can also close it own context. 3005 void CContext::sendCouplerInContextFinalized(CContextClient* client) 3006 TRY 3007 { 3008 if (sendCouplerInContextFinalized_done_.count(client)!=0) return ; 3009 else sendCouplerInContextFinalized_done_.insert(client) ; 3010 3011 CEventClient event(getType(),EVENT_ID_COUPLER_IN_CONTEXT_FINALIZED); 3012 3013 if (client->isServerLeader()) 3014 { 3015 CMessage msg; 3016 msg<<this->getId(); 3017 for (auto& rank : client->getRanksServerLeader()) event.push(rank,1,msg); 3018 client->sendEvent(event); 3019 } 3020 else client->sendEvent(event); 3021 } 3022 CATCH_DUMP_ATTR 3023 3024 //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context. 3025 void CContext::recvCouplerInContextFinalized(CEventServer& event) 3026 TRY 3027 { 3028 CBufferIn* buffer=event.subEvents.begin()->buffer; 3029 getCurrent()->recvCouplerInContextFinalized(*buffer); 3030 } 3031 CATCH 3032 3033 //! Server side: Receive a message announcing that a coupling context have done it contextFinalize, so it can also close it own context. 3034 void CContext::recvCouplerInContextFinalized(CBufferIn& buffer) 3035 TRY 3036 { 3037 string contextId ; 3038 buffer>>contextId; 3039 couplerInContextFinalized_.insert(getCouplerOutClient(contextId)) ; 3040 } 3041 CATCH_DUMP_ATTR 2773 3042 2774 3043
Note: See TracChangeset
for help on using the changeset viewer.