Changeset 1139 for XIOS/dev/dev_olga/src
- Timestamp:
- 05/18/17 19:27:26 (7 years ago)
- Location:
- XIOS/dev/dev_olga/src
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_olga/src/context_client.cpp
r1130 r1139 276 276 map<int,CClientBuffer*>::iterator itBuff; 277 277 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 278 { 278 279 delete itBuff->second; 280 } 279 281 buffers.clear(); 280 282 } … … 420 422 //releaseBuffers(); // moved to CContext::finalize() 421 423 } 424 425 /*! 426 * Finalize context client and do some reports. Function is non-blocking. 427 */ 428 void CContextClient::postFinalize(void) 429 { 430 map<int,CClientBuffer*>::iterator itBuff; 431 bool stop = false; 432 433 CTimer::get("Blocking time").resume(); 434 while (hasTemporarilyBufferedEvent()) 435 { 436 checkBuffers(); 437 sendTemporarilyBufferedEvent(); 438 } 439 CTimer::get("Blocking time").suspend(); 440 441 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_POST_FINALIZE); 442 if (isServerLeader()) 443 { 444 CMessage msg; 445 const std::list<int>& ranks = getRanksServerLeader(); 446 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 447 event.push(*itRank, 1, msg); 448 sendEvent(event); 449 } 450 else sendEvent(event); 451 452 CTimer::get("Blocking time").resume(); 453 // while (!stop) 454 { 455 checkBuffers(); 456 if (hasTemporarilyBufferedEvent()) 457 sendTemporarilyBufferedEvent(); 458 459 stop = true; 460 } 461 CTimer::get("Blocking time").suspend(); 462 463 } 422 464 423 465 /*! -
XIOS/dev/dev_olga/src/context_client.hpp
r1130 r1139 53 53 // void closeContext(void); Never been implemented. 54 54 void finalize(void); 55 void postFinalize(void); 55 56 56 57 void setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize); -
XIOS/dev/dev_olga/src/context_server.cpp
r1130 r1139 225 225 { 226 226 finished=true; 227 // info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl; // moved to CContext::finalize()228 227 std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 229 228 iteMap = mapBufferSize_.end(), itMap; … … 237 236 context->finalize(); 238 237 report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl; 238 } 239 else if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_POST_FINALIZE) 240 { 241 info(20)<<"Server side context <"<<context->getId()<<"> finalized."<<endl; 242 context->postFinalize(); 239 243 } 240 244 else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event); -
XIOS/dev/dev_olga/src/node/context.cpp
r1130 r1139 26 26 : CObjectTemplate<CContext>(), CContextAttributes() 27 27 , calendar(), hasClient(false), hasServer(false) 28 , isPostProcessed(false) //, finalized(false)28 , isPostProcessed(false), finalized(false) 29 29 , idServer_(), client(0), server(0) 30 30 , allProcessed(false), countChildCtx_(0) … … 34 34 : CObjectTemplate<CContext>(id), CContextAttributes() 35 35 , calendar(), hasClient(false), hasServer(false) 36 , isPostProcessed(false) //, finalized(false)36 , isPostProcessed(false), finalized(false) 37 37 , idServer_(), client(0), server(0) 38 38 , allProcessed(false), countChildCtx_(0) … … 45 45 for (std::vector<CContextClient*>::iterator it = clientPrimServer.begin(); it != clientPrimServer.end(); it++) delete *it; 46 46 for (std::vector<CContextServer*>::iterator it = serverPrimServer.begin(); it != serverPrimServer.end(); it++) delete *it; 47 47 48 } 48 49 … … 391 392 else if (CServer::serverLevel == 1) 392 393 { 393 client->checkBuffers(); 394 bool serverFinished = server->eventLoop(); 394 if (!finalized) 395 client->checkBuffers(); 396 bool serverFinished = true; 397 if (!finalized) 398 serverFinished = server->eventLoop(); 395 399 bool serverPrimFinished = true; 396 400 for (int i = 0; i < clientPrimServer.size(); ++i) 397 401 { 398 clientPrimServer[i]->checkBuffers(); 399 serverPrimFinished *= serverPrimServer[i]->eventLoop(); 402 if (!finalized) 403 clientPrimServer[i]->checkBuffers(); 404 if (!finalized) 405 serverPrimFinished *= serverPrimServer[i]->eventLoop(); 400 406 } 401 407 return ( serverFinished && serverPrimFinished); … … 419 425 // (1) blocking send context finalize to its server 420 426 // (2) blocking receive context finalize from its server 427 // (3) send post finalize to its server 421 428 if (CXios::isClient) 422 429 { … … 432 439 while (!server->hasFinished()) 433 440 server->eventLoop(); 441 442 if (CXios::isServer) // Mode attache 443 postFinalize(); 444 else // Mode server 445 client->postFinalize(); 446 434 447 } 435 448 } … … 442 455 clientPrimServer[i]->finalize(); 443 456 444 // (Last) context finalized message received => send context finalize to its parent context 457 // (Last) context finalized message received 458 // Close files, gather registries, send context finalize to its parent context 445 459 if (countChildCtx_ == clientPrimServer.size()) 446 460 client->finalize(); … … 448 462 ++countChildCtx_; 449 463 } 450 451 // If in mode attache call postFinalize452 if (CXios::isServer && CXios::isClient)453 postFinalize();454 464 } 455 465 456 466 /*! 457 467 * \fn void CContext::postFinalize(void) 458 * Close files, gather registries, ,and make deallocations.468 * Close files, gather registries, and make deallocations. 459 469 * Function is called when a context is finalized (it has nothing to receive and nothing to send). 460 470 */ 461 471 void CContext::postFinalize(void) 462 472 { 473 finalized = true; 474 475 // Primary server: blocking send post finalize to secondary servers 476 for (int i = 0; i < clientPrimServer.size(); ++i) 477 clientPrimServer[i]->postFinalize(); 478 bool buffersReleased; 479 do 480 { 481 buffersReleased = true; 482 for (int i = 0; i < clientPrimServer.size(); ++i) 483 { 484 clientPrimServer[i]->checkBuffers(); 485 buffersReleased *= !clientPrimServer[i]->havePendingRequests(); 486 } 487 } while (!buffersReleased); 488 463 489 info(20)<<"Context <"<<getId()<<"> is finalized."<<endl; 464 490 … … 469 495 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 470 496 } 497 498 //! Deallocate client buffers 499 client->releaseBuffers(); 500 for (int i = 0; i < clientPrimServer.size(); ++i) 501 clientPrimServer[i]->releaseBuffers(); 471 502 472 503 //! Free internally allocated communicators … … 475 506 comms.clear(); 476 507 477 //! Deallocate client buffers478 client->releaseBuffers();479 for (int i = 0; i < clientPrimServer.size(); ++i)480 clientPrimServer[i]->releaseBuffers();481 508 } 482 509 … … 1000 1027 void CContext::recvCreateFileHeader(CBufferIn& buffer) 1001 1028 { 1002 // The creation of header file should be delegated to server2, for now1003 // if (hasClient && hasServer)1004 // {1005 // sendCreateFileHeader();1006 // }1007 1008 1029 if (!hasClient && hasServer) 1009 1030 createFileHeader(); … … 1014 1035 { 1015 1036 // Use correct context client to send message 1016 // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1;1017 1037 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1018 1038 for (int i = 0; i < nbSrvPools; ++i) … … 1151 1171 // Find all enabled fields of each file 1152 1172 this->findAllEnabledFields(); 1153 // this->findAllEnabledFieldsInReadModeFiles();1154 1173 // For now, only read files with client and only one level server 1155 1174 if (hasClient && !hasServer) this->findAllEnabledFieldsInReadModeFiles(); … … 1162 1181 } 1163 1182 1164 // // Only search and rebuild all reference objects of enable fields, don't transform1165 // this->solveOnlyRefOfEnabledFields(false);1166 1167 // // Search and rebuild all reference object of enabled fields1168 // this->solveAllRefOfEnabledFields(false);1169 1170 // // Find all fields with read access from the public API1171 // findFieldsWithReadAccess();1172 // // and solve the all reference for them1173 // solveAllRefOfFieldsWithReadAccess();1174 1175 1183 // Only search and rebuild all reference objects of enable fields, don't transform 1176 1184 this->solveOnlyRefOfEnabledFields(false); 1177 1185 1178 1186 // Search and rebuild all reference object of enabled fields, and transform 1179 // this->solveAllEnabledFields1180 1187 this->solveAllRefOfEnabledFieldsAndTransform(false); 1181 // // Check grid and calculate its distribution1182 // if (hasClient) checkGridEnabledFields();1183 1188 1184 1189 // Find all fields with read access from the public API … … 1585 1590 /*! 1586 1591 * \fn bool CContext::isFinalized(void) 1587 * Context is finalized if: 1588 * (1) it has received all context finalize events 1589 * (2) it has no pending events to send. 1592 * Context is finalized if it received context post finalize event. 1590 1593 */ 1591 1594 bool CContext::isFinalized(void) 1592 1595 { 1593 if (countChildCtx_==clientPrimServer.size()+1) 1594 { 1595 bool buffersReleased = !client->havePendingRequests(); 1596 for (int i = 0; i < clientPrimServer.size(); ++i) 1597 buffersReleased *= !clientPrimServer[i]->havePendingRequests(); 1598 return buffersReleased; 1599 } 1600 else 1601 return false; 1596 return finalized; 1602 1597 } 1603 1598 -
XIOS/dev/dev_olga/src/node/context.hpp
r1130 r1139 51 51 EVENT_ID_CLOSE_DEFINITION,EVENT_ID_UPDATE_CALENDAR, 52 52 EVENT_ID_CREATE_FILE_HEADER,EVENT_ID_CONTEXT_FINALIZE, 53 EVENT_ID_CONTEXT_POST_FINALIZE, 53 54 EVENT_ID_POST_PROCESS, EVENT_ID_SEND_REGISTRY, 54 55 EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES, … … 247 248 bool isPostProcessed; 248 249 bool allProcessed; 249 //bool finalized;250 bool finalized; 250 251 int countChildCtx_; //!< Counter of child contexts (for now it is the number of secondary server pools) 251 252 StdString idServer_; -
XIOS/dev/dev_olga/src/server.cpp
r1133 r1139 319 319 while(!stop) 320 320 { 321 322 321 if (isRoot) 323 322 { … … 588 587 { 589 588 bool isFinalized ; 590 591 589 map<string,CContext*>::iterator it ; 592 590 … … 596 594 if (isFinalized) 597 595 { 598 it->second->postFinalize();596 // it->second->postFinalize(); 599 597 contextList.erase(it) ; 600 598 break ; 601 599 } 602 600 else 603 { 604 isFinalized=it->second->checkBuffersAndListen(); 605 } 601 it->second->checkBuffersAndListen(); 606 602 } 607 603 }
Note: See TracChangeset
for help on using the changeset viewer.