Changeset 1460 for XIOS/dev/branch_openmp/src/node/context.cpp
- Timestamp:
- 03/22/18 10:43:20 (6 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_openmp/src/node/context.cpp
r1342 r1460 1 2 1 #include "context.hpp" 3 2 #include "attribute_template.hpp" … … 17 16 #include "timer.hpp" 18 17 #include "memtrack.hpp" 18 #include <limits> 19 #include <fstream> 20 #include "server.hpp" 21 #include "distribute_file_server2.hpp" 19 22 20 23 using namespace ep_lib; 21 24 22 23 25 namespace xios { 24 26 25 27 boost::shared_ptr<CContextGroup> * CContext::root_ptr = 0; 26 28 27 /// ////////////////////// D finitions ////////////////////// ///29 /// ////////////////////// Définitions ////////////////////// /// 28 30 29 31 CContext::CContext(void) 30 32 : CObjectTemplate<CContext>(), CContextAttributes() 31 , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false) 33 , calendar(), hasClient(false), hasServer(false) 34 , isPostProcessed(false), finalized(false) 32 35 , idServer_(), client(0), server(0) 36 , allProcessed(false), countChildCtx_(0) 33 37 { /* Ne rien faire de plus */ } 34 38 35 39 CContext::CContext(const StdString & id) 36 40 : CObjectTemplate<CContext>(id), CContextAttributes() 37 , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false) 41 , calendar(), hasClient(false), hasServer(false) 42 , isPostProcessed(false), finalized(false) 38 43 , idServer_(), client(0), server(0) 44 , allProcessed(false), countChildCtx_(0) 39 45 { /* Ne rien faire de plus */ } 40 46 … … 43 49 delete client; 44 50 delete server; 51 for (std::vector<CContextClient*>::iterator it = clientPrimServer.begin(); it != clientPrimServer.end(); it++) delete *it; 52 for (std::vector<CContextServer*>::iterator it = serverPrimServer.begin(); it != serverPrimServer.end(); it++) delete *it; 53 45 54 } 46 55 … … 182 191 if (!this->hasChild()) 183 192 { 184 //oss << "<!-- No definition -->" << std::endl; // fait planter l'incr mentation193 //oss << "<!-- No definition -->" << std::endl; // fait planter l'incrémentation 185 194 } 186 195 else … … 243 252 void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/) 244 253 { 245 hasClient=true; 246 client = new CContextClient(this,intraComm, interComm, cxtServer); 247 registryIn=new CRegistry(intraComm); 248 registryIn->setPath(getId()) ; 249 if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ; 250 registryIn->bcastRegistry() ; 251 252 registryOut=new CRegistry(intraComm) ; 253 registryOut->setPath(getId()) ; 254 254 255 hasClient = true; 255 256 ep_lib::MPI_Comm intraCommServer, interCommServer; 256 if (cxtServer) // Attached mode 257 { 258 intraCommServer = intraComm; 259 interCommServer = interComm; 257 258 259 if (CServer::serverLevel != 1) 260 // initClient is called by client 261 { 262 client = new CContextClient(this, intraComm, interComm, cxtServer); 263 if (cxtServer) // Attached mode 264 { 265 intraCommServer = intraComm; 266 interCommServer = interComm; 267 } 268 else 269 { 270 ep_lib::MPI_Comm_dup(intraComm, &intraCommServer); 271 comms.push_back(intraCommServer); 272 ep_lib::MPI_Comm_dup(interComm, &interCommServer); 273 comms.push_back(interCommServer); 274 } 275 /* for registry take the id of client context */ 276 /* for servers, supress the _server_ from id */ 277 string contextRegistryId=getId() ; 278 size_t pos=contextRegistryId.find("_server_") ; 279 if (pos!=std::string::npos) contextRegistryId=contextRegistryId.substr(0,pos) ; 280 281 registryIn=new CRegistry(intraComm); 282 registryIn->setPath(contextRegistryId) ; 283 if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ; 284 registryIn->bcastRegistry() ; 285 registryOut=new CRegistry(intraComm) ; 286 287 registryOut->setPath(contextRegistryId) ; 288 289 server = new CContextServer(this, intraCommServer, interCommServer); 260 290 } 261 291 else 262 { 292 // initClient is called by primary server 293 { 294 clientPrimServer.push_back(new CContextClient(this, intraComm, interComm)); 263 295 ep_lib::MPI_Comm_dup(intraComm, &intraCommServer); 264 296 comms.push_back(intraCommServer); 265 297 ep_lib::MPI_Comm_dup(interComm, &interCommServer); 266 298 comms.push_back(interCommServer); 267 } 268 server = new CContextServer(this,intraCommServer,interCommServer); 269 } 270 271 void CContext::setClientServerBuffer() 272 { 273 // Estimated minimum event size for small events (10 is an arbitrary constant just for safety) 299 serverPrimServer.push_back(new CContextServer(this, intraCommServer, interCommServer)); 300 } 301 } 302 303 /*! 304 Sets client buffers. 305 \param [in] contextClient 306 \param [in] bufferForWriting True if buffers are used for sending data for writing 307 This flag is only true for client and server-1 for communication with server-2 308 */ 309 void CContext::setClientServerBuffer(CContextClient* contextClient, bool bufferForWriting) 310 { 311 // Estimated minimum event size for small events (10 is an arbitrary constant just for safety) 274 312 const size_t minEventSize = CEventClient::headerSize + getIdServer().size() + 10 * sizeof(int); 275 // Ensure there is at least some room for 20 of such events in the buffers 276 size_t minBufferSize = std::max(CXios::minBufferSize, 20 * minEventSize); 313 314 // Ensure there is at least some room for 20 of such events in the buffers 315 size_t minBufferSize = std::max(CXios::minBufferSize, 20 * minEventSize); 316 277 317 #define DECLARE_NODE(Name_, name_) \ 278 318 if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition); … … 284 324 // Compute the buffer sizes needed to send the attributes and data corresponding to fields 285 325 std::map<int, StdSize> maxEventSize; 286 std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize );287 std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize );326 std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize, contextClient, bufferForWriting); 327 std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize, contextClient, bufferForWriting); 288 328 289 329 std::map<int, StdSize>::iterator it, ite = dataBufferSize.end(); … … 291 331 if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second; 292 332 293 // Apply the buffer size factor and check that we are above the minimum buffersize333 // Apply the buffer size factor, check that we are above the minimum buffer size and below the maximum size 294 334 ite = bufferSize.end(); 295 335 for (it = bufferSize.begin(); it != ite; ++it) … … 297 337 it->second *= CXios::bufferSizeFactor; 298 338 if (it->second < minBufferSize) it->second = minBufferSize; 339 if (it->second > CXios::maxBufferSize) it->second = CXios::maxBufferSize; 299 340 } 300 341 301 342 // Leaders will have to send some control events so ensure there is some room for those in the buffers 302 if (c lient->isServerLeader())303 { 304 const std::list<int>& ranks = c lient->getRanksServerLeader();343 if (contextClient->isServerLeader()) 344 { 345 const std::list<int>& ranks = contextClient->getRanksServerLeader(); 305 346 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 306 347 { … … 312 353 } 313 354 } 314 315 client->setBufferSize(bufferSize, maxEventSize); 355 contextClient->setBufferSize(bufferSize, maxEventSize); 356 316 357 } 317 358 … … 322 363 } 323 364 324 //! Initialize server325 365 void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/) 326 366 { … … 328 368 server = new CContextServer(this,intraComm,interComm); 329 369 370 /* for registry take the id of client context */ 371 /* for servers, supress the _server_ from id */ 372 string contextRegistryId=getId() ; 373 size_t pos=contextRegistryId.find("_server_") ; 374 if (pos!=std::string::npos) contextRegistryId=contextRegistryId.substr(0,pos) ; 375 330 376 registryIn=new CRegistry(intraComm); 331 registryIn->setPath( getId()) ;377 registryIn->setPath(contextRegistryId) ; 332 378 if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ; 333 379 registryIn->bcastRegistry() ; 334 380 registryOut=new CRegistry(intraComm) ; 335 registryOut->setPath( getId()) ;381 registryOut->setPath(contextRegistryId) ; 336 382 337 383 ep_lib::MPI_Comm intraCommClient, interCommClient; … … 352 398 353 399 //! Try to send the buffers and receive possible answers 354 bool CContext::checkBuffersAndListen(void) 355 { 356 client->checkBuffers(); 357 358 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 359 if (hasTmpBufferedEvent) 360 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 361 362 // Don't process events if there is a temporarily buffered event 363 return server->eventLoop(!hasTmpBufferedEvent); 364 } 400 bool CContext::checkBuffersAndListen(bool enableEventsProcessing /*= true*/) 401 { 402 bool clientReady, serverFinished; 403 404 // Only classical servers are non-blocking 405 if (CServer::serverLevel == 0) 406 { 407 client->checkBuffers(); 408 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 409 if (hasTmpBufferedEvent) 410 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 411 // Don't process events if there is a temporarily buffered event 412 return server->eventLoop(!hasTmpBufferedEvent || !enableEventsProcessing); 413 } 414 else if (CServer::serverLevel == 1) 415 { 416 if (!finalized) 417 client->checkBuffers(); 418 bool serverFinished = true; 419 if (!finalized) 420 serverFinished = server->eventLoop(enableEventsProcessing); 421 bool serverPrimFinished = true; 422 for (int i = 0; i < clientPrimServer.size(); ++i) 423 { 424 if (!finalized) 425 clientPrimServer[i]->checkBuffers(); 426 if (!finalized) 427 serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing); 428 } 429 return ( serverFinished && serverPrimFinished); 430 } 431 432 else if (CServer::serverLevel == 2) 433 { 434 client->checkBuffers(); 435 return server->eventLoop(enableEventsProcessing); 436 } 437 } 365 438 366 439 //! Terminate a context 367 440 void CContext::finalize(void) 368 441 { 369 if ( !finalized)442 if (hasClient && !hasServer) // For now we only use server level 1 to read data 370 443 { 371 finalized = true; 372 if (hasClient) sendRegistry() ; 373 client->finalize(); 374 while (!server->hasFinished()) 375 { 376 server->eventLoop(); 377 } 378 379 if (hasServer) 380 { 381 closeAllFile(); 382 registryOut->hierarchicalGatherRegistry() ; 383 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 384 } 385 386 for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 387 ep_lib::MPI_Comm_free(&(*it)); 388 comms.clear(); 444 doPreTimestepOperationsForEnabledReadModeFiles(); 389 445 } 446 // Send registry upon calling the function the first time 447 if (countChildCtx_ == 0) 448 if (hasClient) sendRegistry() ; 449 450 // Client: 451 // (1) blocking send context finalize to its server 452 // (2) blocking receive context finalize from its server 453 // (3) some memory deallocations 454 if (CXios::isClient) 455 { 456 // Make sure that client (model) enters the loop only once 457 if (countChildCtx_ < 1) 458 { 459 ++countChildCtx_; 460 461 client->finalize(); 462 while (client->havePendingRequests()) 463 client->checkBuffers(); 464 465 while (!server->hasFinished()) 466 server->eventLoop(); 467 468 if (hasServer) // Mode attache 469 { 470 closeAllFile(); 471 registryOut->hierarchicalGatherRegistry() ; 472 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 473 } 474 475 //! Deallocate client buffers 476 client->releaseBuffers(); 477 478 //! Free internally allocated communicators 479 for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 480 ep_lib::MPI_Comm_free(&(*it)); 481 comms.clear(); 482 483 #pragma omp critical (_output) 484 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 485 } 486 } 487 else if (CXios::isServer) 488 { 489 // First context finalize message received from a model 490 // Send context finalize to its child contexts (if any) 491 if (countChildCtx_ == 0) 492 for (int i = 0; i < clientPrimServer.size(); ++i) 493 clientPrimServer[i]->finalize(); 494 495 // (Last) context finalized message received 496 if (countChildCtx_ == clientPrimServer.size()) 497 { 498 // Blocking send of context finalize message to its client (e.g. primary server or model) 499 #pragma omp critical (_output) 500 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize<<"<<endl ; 501 client->finalize(); 502 bool bufferReleased; 503 do 504 { 505 client->checkBuffers(); 506 bufferReleased = !client->havePendingRequests(); 507 } while (!bufferReleased); 508 finalized = true; 509 510 closeAllFile(); // Just move to here to make sure that server-level 1 can close files 511 if (hasServer && !hasClient) 512 { 513 registryOut->hierarchicalGatherRegistry() ; 514 if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 515 } 516 517 //! Deallocate client buffers 518 client->releaseBuffers(); 519 for (int i = 0; i < clientPrimServer.size(); ++i) 520 clientPrimServer[i]->releaseBuffers(); 521 522 //! Free internally allocated communicators 523 for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 524 ep_lib::MPI_Comm_free(&(*it)); 525 comms.clear(); 526 527 #pragma omp critical (_output) 528 info(20)<<"CContext: Context <"<<getId()<<"> is finalized."<<endl; 529 } 530 531 ++countChildCtx_; 532 } 533 } 534 535 //! Free internally allocated communicators 536 void CContext::freeComms(void) 537 { 538 for (std::list<ep_lib::MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 539 ep_lib::MPI_Comm_free(&(*it)); 540 comms.clear(); 541 } 542 543 //! Deallocate buffers allocated by clientContexts 544 void CContext::releaseClientBuffers(void) 545 { 546 client->releaseBuffers(); 547 for (int i = 0; i < clientPrimServer.size(); ++i) 548 clientPrimServer[i]->releaseBuffers(); 549 } 550 551 void CContext::postProcessingGlobalAttributes() 552 { 553 if (allProcessed) return; 554 555 // After xml is parsed, there are some more works with post processing 556 postProcessing(); 557 558 // Check grid and calculate its distribution 559 checkGridEnabledFields(); 560 561 // Distribute files between secondary servers according to the data size 562 distributeFiles(); 563 564 setClientServerBuffer(client, (hasClient && !hasServer)); 565 for (int i = 0; i < clientPrimServer.size(); ++i) 566 setClientServerBuffer(clientPrimServer[i], true); 567 568 if (hasClient) 569 { 570 // Send all attributes of current context to server 571 this->sendAllAttributesToServer(); 572 573 // Send all attributes of current calendar 574 CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(); 575 576 // We have enough information to send to server 577 // First of all, send all enabled files 578 sendEnabledFiles(this->enabledWriteModeFiles); 579 // We only use server-level 1 (for now) to read data 580 if (!hasServer) 581 sendEnabledFiles(this->enabledReadModeFiles); 582 583 // Then, send all enabled fields 584 sendEnabledFieldsInFiles(this->enabledWriteModeFiles); 585 if (!hasServer) 586 sendEnabledFieldsInFiles(this->enabledReadModeFiles); 587 588 // Then, check whether we have domain_ref, axis_ref or scalar_ref attached to the enabled fields 589 // If any, so send them to server 590 sendRefDomainsAxisScalars(this->enabledWriteModeFiles); 591 if (!hasServer) 592 sendRefDomainsAxisScalars(this->enabledReadModeFiles); 593 594 // Check whether enabled fields have grid_ref, if any, send this info to server 595 sendRefGrid(this->enabledFiles); 596 // This code may be useful in the future when we want to seperate completely read and write 597 // sendRefGrid(this->enabledWriteModeFiles); 598 // if (!hasServer) 599 // sendRefGrid(this->enabledReadModeFiles); 600 601 // A grid of enabled fields composed of several components which must be checked then their 602 // checked attributes should be sent to server 603 sendGridComponentEnabledFieldsInFiles(this->enabledFiles); // This code can be seperated in two (one for reading, another for writing) 604 605 // We have a xml tree on the server side and now, it should be also processed 606 sendPostProcessing(); 607 608 // Finally, we send information of grid itself to server 609 sendGridEnabledFieldsInFiles(this->enabledWriteModeFiles); 610 if (!hasServer) 611 sendGridEnabledFieldsInFiles(this->enabledReadModeFiles); 612 } 613 allProcessed = true; 614 } 615 616 void CContext::sendPostProcessingGlobalAttributes() 617 { 618 // Use correct context client to send message 619 // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1; 620 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 621 for (int i = 0; i < nbSrvPools; ++i) 622 { 623 CContextClient* contextClientTmp = (0 != clientPrimServer.size()) ? clientPrimServer[i] : client; 624 CEventClient event(getType(),EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES); 625 626 if (contextClientTmp->isServerLeader()) 627 { 628 CMessage msg; 629 if (hasServer) 630 msg<<this->getIdServer(i); 631 else 632 msg<<this->getIdServer(); 633 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 634 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 635 event.push(*itRank,1,msg); 636 contextClientTmp->sendEvent(event); 637 } 638 else contextClientTmp->sendEvent(event); 639 } 640 } 641 642 void CContext::recvPostProcessingGlobalAttributes(CEventServer& event) 643 { 644 CBufferIn* buffer=event.subEvents.begin()->buffer; 645 string id; 646 *buffer>>id; 647 get(id)->recvPostProcessingGlobalAttributes(*buffer); 648 } 649 650 void CContext::recvPostProcessingGlobalAttributes(CBufferIn& buffer) 651 { 652 postProcessingGlobalAttributes(); 390 653 } 391 654 … … 401 664 void CContext::closeDefinition(void) 402 665 { 403 CTimer::get("Context : close definition").resume() ; 404 // There is nothing client need to send to server 405 if (hasClient) 406 { 407 // After xml is parsed, there are some more works with post processing 408 postProcessing(); 409 } 410 setClientServerBuffer(); 411 666 CTimer::get("Context : close definition").resume() ; 667 postProcessingGlobalAttributes(); 668 669 if (hasClient) sendPostProcessingGlobalAttributes(); 670 671 // There are some processings that should be done after all of above. For example: check mask or index 672 this->buildFilterGraphOfEnabledFields(); 673 412 674 if (hasClient && !hasServer) 413 { 414 // Send all attributes of current context to server 415 this->sendAllAttributesToServer(); 416 417 // Send all attributes of current calendar 418 CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(); 419 420 // We have enough information to send to server 421 // First of all, send all enabled files 422 sendEnabledFiles(); 423 424 // Then, send all enabled fields 425 sendEnabledFields(); 426 427 // At last, we have all info of domain and axis, then send them 428 sendRefDomainsAxis(); 429 430 // After that, send all grid (if any) 431 sendRefGrid(); 675 { 676 buildFilterGraphOfFieldsWithReadAccess(); 677 postProcessFilterGraph(); 432 678 } 433 434 // We have a xml tree on the server side and now, it should be also processed 435 if (hasClient && !hasServer) sendPostProcessing(); 436 437 // There are some processings that should be done after all of above. For example: check mask or index 438 if (hasClient) 439 { 440 this->buildFilterGraphOfEnabledFields(); 441 buildFilterGraphOfFieldsWithReadAccess(); 442 this->solveAllRefOfEnabledFields(true); 443 } 444 445 // Now tell server that it can process all messages from client 446 if (hasClient && !hasServer) this->sendCloseDefinition(); 679 680 checkGridEnabledFields(); 681 682 if (hasClient) this->sendProcessingGridOfEnabledFields(); 683 if (hasClient) this->sendCloseDefinition(); 447 684 448 685 // Nettoyage de l'arborescence 449 if (hasClient && !hasServer) CleanTree(); // Only on client side??686 if (hasClient) CleanTree(); // Only on client side?? 450 687 451 688 if (hasClient) 452 689 { 453 690 sendCreateFileHeader(); 454 455 startPrefetchingOfEnabledReadModeFiles(); 691 if (!hasServer) startPrefetchingOfEnabledReadModeFiles(); 456 692 } 457 693 CTimer::get("Context : close definition").suspend() ; 458 694 } 459 695 460 void CContext::findAllEnabledFields(void) 461 { 462 for (unsigned int i = 0; i < this->enabledFiles.size(); i++) 463 (void)this->enabledFiles[i]->getEnabledFields(); 464 } 465 466 void CContext::findAllEnabledFieldsInReadModeFiles(void) 467 { 468 for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i) 469 (void)this->enabledReadModeFiles[i]->getEnabledFields(); 696 void CContext::findAllEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles) 697 { 698 for (unsigned int i = 0; i < activeFiles.size(); i++) 699 (void)activeFiles[i]->getEnabledFields(); 470 700 } 471 701 … … 476 706 } 477 707 708 void CContext::sendGridComponentEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles) 709 { 710 int size = activeFiles.size(); 711 for (int i = 0; i < size; ++i) 712 { 713 activeFiles[i]->sendGridComponentOfEnabledFields(); 714 } 715 } 716 717 /*! 718 Send active (enabled) fields in file from a client to others 719 \param [in] activeFiles files contains enabled fields to send 720 */ 721 void CContext::sendGridEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles) 722 { 723 int size = activeFiles.size(); 724 for (int i = 0; i < size; ++i) 725 { 726 activeFiles[i]->sendGridOfEnabledFields(); 727 } 728 } 729 730 void CContext::checkGridEnabledFields() 731 { 732 int size = enabledFiles.size(); 733 for (int i = 0; i < size; ++i) 734 { 735 enabledFiles[i]->checkGridOfEnabledFields(); 736 } 737 } 738 739 /*! 740 Check grid of active (enabled) fields in file 741 \param [in] activeFiles files contains enabled fields whose grid needs checking 742 */ 743 void CContext::checkGridEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles) 744 { 745 int size = activeFiles.size(); 746 for (int i = 0; i < size; ++i) 747 { 748 activeFiles[i]->checkGridOfEnabledFields(); 749 } 750 } 751 752 /*! 753 Go up the hierachical tree via field_ref and do check of attributes of fields 754 This can be done in a client then all computed information will be sent from this client to others 755 \param [in] sendToServer Flag to indicate whether calculated information will be sent 756 */ 478 757 void CContext::solveOnlyRefOfEnabledFields(bool sendToServer) 479 758 { … … 490 769 } 491 770 492 void CContext::solveAllRefOfEnabledFields(bool sendToServer) 771 /*! 772 Go up the hierachical tree via field_ref and do check of attributes of fields. 773 The transformation can be done in this step. 774 All computed information will be sent from this client to others. 775 \param [in] sendToServer Flag to indicate whether calculated information will be sent 776 */ 777 void CContext::solveAllRefOfEnabledFieldsAndTransform(bool sendToServer) 493 778 { 494 779 int size = this->enabledFiles.size(); 495 780 for (int i = 0; i < size; ++i) 496 781 { 497 this->enabledFiles[i]->solveAllRefOfEnabledFields (sendToServer);782 this->enabledFiles[i]->solveAllRefOfEnabledFieldsAndTransform(sendToServer); 498 783 } 499 784 } … … 508 793 } 509 794 795 void CContext::postProcessFilterGraph() 796 { 797 int size = enabledFiles.size(); 798 for (int i = 0; i < size; ++i) 799 { 800 enabledFiles[i]->postProcessFilterGraph(); 801 } 802 } 803 510 804 void CContext::startPrefetchingOfEnabledReadModeFiles() 511 805 { … … 514 808 { 515 809 enabledReadModeFiles[i]->prefetchEnabledReadModeFields(); 810 } 811 } 812 813 void CContext::doPreTimestepOperationsForEnabledReadModeFiles() 814 { 815 int size = enabledReadModeFiles.size(); 816 for (int i = 0; i < size; ++i) 817 { 818 enabledReadModeFiles[i]->doPreTimestepOperationsForEnabledReadModeFields(); 516 819 } 517 820 } … … 553 856 } 554 857 555 void CContext::solveAllInheritance(bool apply) // default : apply = true556 { 557 // R solution des hritages descendants (cd des hritages de groupes)858 void CContext::solveAllInheritance(bool apply) 859 { 860 // Résolution des héritages descendants (cà d des héritages de groupes) 558 861 // pour chacun des contextes. 559 862 solveDescInheritance(apply); 560 863 561 // R solution des hritages par rfrence au niveau des fichiers.864 // Résolution des héritages par référence au niveau des fichiers. 562 865 const vector<CFile*> allFiles=CFile::getAll(); 563 866 const vector<CGrid*> allGrids= CGrid::getAll(); 564 867 565 //if (hasClient && !hasServer)566 if (hasClient)868 if (hasClient && !hasServer) 869 //if (hasClient) 567 870 { 568 871 for (unsigned int i = 0; i < allFiles.size(); i++) … … 583 886 584 887 for (unsigned int i = 0; i < allFiles.size(); i++) 585 if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est d fini.888 if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini. 586 889 { 587 if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fix 890 if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai. 588 891 { 589 892 if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep())) … … 610 913 611 914 if (enabledFiles.size() == 0) 612 DEBUG(<<"Aucun fichier ne va tre sorti dans le contexte nomm\""915 DEBUG(<<"Aucun fichier ne va être sorti dans le contexte nommé \"" 613 916 << getId() << "\" !"); 614 } 615 616 void CContext::findEnabledReadModeFiles(void) 917 918 } 919 920 void CContext::distributeFiles(void) 921 { 922 bool distFileMemory=false ; 923 distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory); 924 925 if (distFileMemory) distributeFileOverMemoryBandwith() ; 926 else distributeFileOverBandwith() ; 927 } 928 929 930 void CContext::distributeFileOverBandwith(void) 931 { 932 double eps=std::numeric_limits<double>::epsilon()*10 ; 933 934 // If primary server 935 if (hasServer && hasClient) 936 { 937 std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 938 int nbPools = clientPrimServer.size(); 939 940 // (1) Find all enabled files in write mode 941 // for (int i = 0; i < this->enabledFiles.size(); ++i) 942 // { 943 // if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write )) 944 // enabledWriteModeFiles.push_back(enabledFiles[i]); 945 // } 946 947 // (2) Estimate the data volume for each file 948 int size = this->enabledWriteModeFiles.size(); 949 std::vector<std::pair<double, CFile*> > dataSizeMap; 950 double dataPerPool = 0; 951 int nfield=0 ; 952 ofs<<size<<endl ; 953 for (size_t i = 0; i < size; ++i) 954 { 955 CFile* file = this->enabledWriteModeFiles[i]; 956 ofs<<file->getId()<<endl ; 957 StdSize dataSize=0; 958 std::vector<CField*> enabledFields = file->getEnabledFields(); 959 size_t numEnabledFields = enabledFields.size(); 960 ofs<<numEnabledFields<<endl ; 961 for (size_t j = 0; j < numEnabledFields; ++j) 962 { 963 dataSize += enabledFields[j]->getGlobalWrittenSize() ; 964 ofs<<enabledFields[j]->grid->getId()<<endl ; 965 ofs<<enabledFields[j]->getGlobalWrittenSize()<<endl ; 966 } 967 double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; 968 double dataSizeSec= dataSize/ outFreqSec; 969 ofs<<dataSizeSec<<endl ; 970 nfield++ ; 971 // add epsilon*nField to dataSizeSec in order to preserve reproductive ordering when sorting 972 dataSizeMap.push_back(make_pair(dataSizeSec + dataSizeSec * eps * nfield , file)); 973 dataPerPool += dataSizeSec; 974 } 975 dataPerPool /= nbPools; 976 std::sort(dataSizeMap.begin(), dataSizeMap.end()); 977 978 // (3) Assign contextClient to each enabled file 979 980 std::multimap<double,int> poolDataSize ; 981 // multimap is not garanty to preserve stable sorting in c++98 but it seems it does for c++11 982 983 int j; 984 double dataSize ; 985 for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair<double,int>(0.,j)) ; 986 987 for (int i = dataSizeMap.size()-1; i >= 0; --i) 988 { 989 dataSize=(*poolDataSize.begin()).first ; 990 j=(*poolDataSize.begin()).second ; 991 dataSizeMap[i].second->setContextClient(clientPrimServer[j]); 992 dataSize+=dataSizeMap[i].first; 993 poolDataSize.erase(poolDataSize.begin()) ; 994 poolDataSize.insert(std::pair<double,int>(dataSize,j)) ; 995 } 996 997 for (std::multimap<double,int>:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it) 998 { 999 #pragma omp critical (_output) 1000 info(30)<<"Load Balancing for servers (perfect=1) : "<<it->second<<" : ratio "<<it->first*1./dataPerPool<<endl ; 1001 } 1002 1003 for (int i = 0; i < this->enabledReadModeFiles.size(); ++i) 1004 { 1005 enabledReadModeFiles[i]->setContextClient(client); 1006 } 1007 } 1008 else 1009 { 1010 for (int i = 0; i < this->enabledFiles.size(); ++i) 1011 enabledFiles[i]->setContextClient(client); 1012 } 1013 } 1014 1015 void CContext::distributeFileOverMemoryBandwith(void) 1016 { 1017 // If primary server 1018 if (hasServer && hasClient) 1019 { 1020 int nbPools = clientPrimServer.size(); 1021 double ratio=0.5 ; 1022 ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); 1023 1024 int nFiles = this->enabledWriteModeFiles.size(); 1025 vector<SDistFile> files(nFiles); 1026 vector<SDistGrid> grids; 1027 map<string,int> gridMap ; 1028 string gridId; 1029 int gridIndex=0 ; 1030 1031 for (size_t i = 0; i < nFiles; ++i) 1032 { 1033 StdSize dataSize=0; 1034 CFile* file = this->enabledWriteModeFiles[i]; 1035 std::vector<CField*> enabledFields = file->getEnabledFields(); 1036 size_t numEnabledFields = enabledFields.size(); 1037 1038 files[i].id_=file->getId() ; 1039 files[i].nbGrids_=numEnabledFields; 1040 files[i].assignedGrid_ = new int[files[i].nbGrids_] ; 1041 1042 for (size_t j = 0; j < numEnabledFields; ++j) 1043 { 1044 gridId=enabledFields[j]->grid->getId() ; 1045 if (gridMap.find(gridId)==gridMap.end()) 1046 { 1047 gridMap[gridId]=gridIndex ; 1048 SDistGrid newGrid; 1049 grids.push_back(newGrid) ; 1050 gridIndex++ ; 1051 } 1052 files[i].assignedGrid_[j]=gridMap[gridId] ; 1053 grids[files[i].assignedGrid_[j]].size_=enabledFields[j]->getGlobalWrittenSize() ; 1054 dataSize += enabledFields[j]->getGlobalWrittenSize() ; // usefull 1055 } 1056 double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; 1057 files[i].bandwith_= dataSize/ outFreqSec ; 1058 } 1059 1060 double bandwith=0 ; 1061 double memory=0 ; 1062 1063 for(int i=0; i<nFiles; i++) bandwith+=files[i].bandwith_ ; 1064 for(int i=0; i<nFiles; i++) files[i].bandwith_ = files[i].bandwith_/bandwith * ratio ; 1065 1066 for(int i=0; i<grids.size(); i++) memory+=grids[i].size_ ; 1067 for(int i=0; i<grids.size(); i++) grids[i].size_ = grids[i].size_ / memory * (1.0-ratio) ; 1068 1069 distributeFileOverServer2(nbPools, grids.size(), &grids[0], nFiles, &files[0]) ; 1070 1071 vector<double> memorySize(nbPools,0.) ; 1072 vector< set<int> > serverGrids(nbPools) ; 1073 vector<double> bandwithSize(nbPools,0.) ; 1074 1075 for (size_t i = 0; i < nFiles; ++i) 1076 { 1077 bandwithSize[files[i].assignedServer_] += files[i].bandwith_* bandwith /ratio ; 1078 for(int j=0 ; j<files[i].nbGrids_;j++) 1079 { 1080 if (serverGrids[files[i].assignedServer_].find(files[i].assignedGrid_[j]) == serverGrids[files[i].assignedServer_].end()) 1081 { 1082 memorySize[files[i].assignedServer_]+= grids[files[i].assignedGrid_[j]].size_ * memory / (1.0-ratio); 1083 serverGrids[files[i].assignedServer_].insert(files[i].assignedGrid_[j]) ; 1084 } 1085 } 1086 enabledWriteModeFiles[i]->setContextClient(clientPrimServer[files[i].assignedServer_]) ; 1087 delete [] files[i].assignedGrid_ ; 1088 } 1089 1090 for (int i = 0; i < nbPools; ++i) 1091 { 1092 #pragma omp critical (_output) 1093 info(100)<<"Pool server level2 "<<i<<" assigned file bandwith "<<bandwithSize[i]*86400.*4./1024/1024.<<" Mb / days"<<endl ; 1094 } 1095 for (int i = 0; i < nbPools; ++i) 1096 { 1097 #pragma omp critical (_output) 1098 info(100)<<"Pool server level2 "<<i<<" assigned grid memory "<<memorySize[i]*100/1024./1024.<<" Mb"<<endl ; 1099 } 1100 1101 for (int i = 0; i < this->enabledReadModeFiles.size(); ++i) 1102 { 1103 enabledReadModeFiles[i]->setContextClient(client); 1104 } 1105 1106 } 1107 else 1108 { 1109 for (int i = 0; i < this->enabledFiles.size(); ++i) 1110 enabledFiles[i]->setContextClient(client); 1111 } 1112 } 1113 1114 1115 1116 /*! 1117 Find all files in write mode 1118 */ 1119 void CContext::findEnabledWriteModeFiles(void) 617 1120 { 618 1121 int size = this->enabledFiles.size(); 619 1122 for (int i = 0; i < size; ++i) 620 1123 { 1124 if (enabledFiles[i]->mode.isEmpty() || 1125 (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write )) 1126 enabledWriteModeFiles.push_back(enabledFiles[i]); 1127 } 1128 } 1129 1130 /*! 1131 Find all files in read mode 1132 */ 1133 void CContext::findEnabledReadModeFiles(void) 1134 { 1135 int size = this->enabledFiles.size(); 1136 for (int i = 0; i < size; ++i) 1137 { 621 1138 if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read) 622 1139 enabledReadModeFiles.push_back(enabledFiles[i]); … … 631 1148 for (; it != end; it++) 632 1149 { 1150 #pragma omp critical (_output) 633 1151 info(30)<<"Closing File : "<<(*it)->getId()<<endl; 634 1152 (*it)->close(); … … 669 1187 recvRegistry(event); 670 1188 return true; 671 break; 672 1189 break; 1190 case EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES: 1191 recvPostProcessingGlobalAttributes(event); 1192 return true; 1193 break; 1194 case EVENT_ID_PROCESS_GRID_ENABLED_FIELDS: 1195 recvProcessingGridOfEnabledFields(event); 1196 return true; 1197 break; 673 1198 default : 674 1199 ERROR("bool CContext::dispatchEvent(CEventServer& event)", … … 682 1207 void CContext::sendCloseDefinition(void) 683 1208 { 684 CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION); 685 if (client->isServerLeader()) 686 { 687 CMessage msg; 688 msg<<this->getIdServer(); 689 const std::list<int>& ranks = client->getRanksServerLeader(); 690 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 691 event.push(*itRank,1,msg); 692 client->sendEvent(event); 693 } 694 else client->sendEvent(event); 1209 // Use correct context client to send message 1210 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1211 for (int i = 0; i < nbSrvPools; ++i) 1212 { 1213 CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; 1214 CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION); 1215 if (contextClientTmp->isServerLeader()) 1216 { 1217 CMessage msg; 1218 if (hasServer) 1219 msg<<this->getIdServer(i); 1220 else 1221 msg<<this->getIdServer(); 1222 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 1223 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1224 event.push(*itRank,1,msg); 1225 contextClientTmp->sendEvent(event); 1226 } 1227 else contextClientTmp->sendEvent(event); 1228 } 695 1229 } 696 1230 … … 698 1232 void CContext::recvCloseDefinition(CEventServer& event) 699 1233 { 700 701 1234 CBufferIn* buffer=event.subEvents.begin()->buffer; 702 1235 string id; … … 708 1241 void CContext::sendUpdateCalendar(int step) 709 1242 { 710 if (!hasServer) 711 { 1243 // Use correct context client to send message 1244 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1245 for (int i = 0; i < nbSrvPools; ++i) 1246 { 1247 CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; 712 1248 CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR); 713 if (client->isServerLeader()) 714 { 715 CMessage msg; 716 msg<<this->getIdServer()<<step; 717 const std::list<int>& ranks = client->getRanksServerLeader(); 718 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 719 event.push(*itRank,1,msg); 720 client->sendEvent(event); 721 } 722 else client->sendEvent(event); 1249 1250 if (contextClientTmp->isServerLeader()) 1251 { 1252 CMessage msg; 1253 if (hasServer) 1254 msg<<this->getIdServer(i)<<step; 1255 else 1256 msg<<this->getIdServer()<<step; 1257 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 1258 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1259 event.push(*itRank,1,msg); 1260 contextClientTmp->sendEvent(event); 1261 } 1262 else contextClientTmp->sendEvent(event); 723 1263 } 724 1264 } … … 739 1279 buffer>>step; 740 1280 updateCalendar(step); 1281 if (hasClient && hasServer) 1282 { 1283 sendUpdateCalendar(step); 1284 } 741 1285 } 742 1286 … … 744 1288 void CContext::sendCreateFileHeader(void) 745 1289 { 746 CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER); 747 if (client->isServerLeader()) 748 { 749 CMessage msg; 750 msg<<this->getIdServer(); 751 const std::list<int>& ranks = client->getRanksServerLeader(); 752 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 753 event.push(*itRank,1,msg) ; 754 client->sendEvent(event); 755 } 756 else client->sendEvent(event); 1290 // Use correct context client to send message 1291 // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1; 1292 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1293 for (int i = 0; i < nbSrvPools; ++i) 1294 { 1295 CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; 1296 CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER); 1297 1298 if (contextClientTmp->isServerLeader()) 1299 { 1300 CMessage msg; 1301 if (hasServer) 1302 msg<<this->getIdServer(i); 1303 else 1304 msg<<this->getIdServer(); 1305 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 1306 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1307 event.push(*itRank,1,msg) ; 1308 contextClientTmp->sendEvent(event); 1309 } 1310 else contextClientTmp->sendEvent(event); 1311 } 757 1312 } 758 1313 … … 769 1324 void CContext::recvCreateFileHeader(CBufferIn& buffer) 770 1325 { 771 createFileHeader(); 1326 if (!hasClient && hasServer) 1327 createFileHeader(); 1328 } 1329 1330 //! Client side: Send a message to do some post processing on server 1331 void CContext::sendProcessingGridOfEnabledFields() 1332 { 1333 // Use correct context client to send message 1334 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1335 for (int i = 0; i < nbSrvPools; ++i) 1336 { 1337 CContextClient* contextClientTmp = (0 != clientPrimServer.size()) ? clientPrimServer[i] : client; 1338 CEventClient event(getType(),EVENT_ID_PROCESS_GRID_ENABLED_FIELDS); 1339 1340 if (contextClientTmp->isServerLeader()) 1341 { 1342 CMessage msg; 1343 if (hasServer) 1344 msg<<this->getIdServer(i); 1345 else 1346 msg<<this->getIdServer(); 1347 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 1348 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1349 event.push(*itRank,1,msg); 1350 contextClientTmp->sendEvent(event); 1351 } 1352 else contextClientTmp->sendEvent(event); 1353 } 1354 } 1355 1356 //! Server side: Receive a message to do some post processing 1357 void CContext::recvProcessingGridOfEnabledFields(CEventServer& event) 1358 { 1359 CBufferIn* buffer=event.subEvents.begin()->buffer; 1360 string id; 1361 *buffer>>id; 772 1362 } 773 1363 … … 775 1365 void CContext::sendPostProcessing() 776 1366 { 777 if (!hasServer) 778 { 1367 // Use correct context client to send message 1368 // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1; 1369 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1370 for (int i = 0; i < nbSrvPools; ++i) 1371 { 1372 CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; 779 1373 CEventClient event(getType(),EVENT_ID_POST_PROCESS); 780 if (c lient->isServerLeader())1374 if (contextClientTmp->isServerLeader()) 781 1375 { 782 1376 CMessage msg; 783 msg<<this->getIdServer(); 784 const std::list<int>& ranks = client->getRanksServerLeader(); 1377 if (hasServer) 1378 msg<<this->getIdServer(i); 1379 else 1380 msg<<this->getIdServer(); 1381 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 785 1382 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 786 787 c lient->sendEvent(event);788 } 789 else c lient->sendEvent(event);1383 event.push(*itRank,1,msg); 1384 contextClientTmp->sendEvent(event); 1385 } 1386 else contextClientTmp->sendEvent(event); 790 1387 } 791 1388 } … … 818 1415 } 819 1416 1417 const StdString& CContext::getIdServer(const int i) 1418 { 1419 idServer_ = this->getId(); 1420 idServer_ += "_server_"; 1421 idServer_ += boost::lexical_cast<string>(i); 1422 return idServer_; 1423 } 1424 1425 820 1426 /*! 821 1427 \brief Do some simple post processings after parsing xml file … … 839 1445 this->solveAllInheritance(); 840 1446 1447 // ShowTree(info(10)); 1448 841 1449 // Check if some axis, domains or grids are eligible to for compressed indexed output. 842 1450 // Warning: This must be done after solving the inheritance and before the rest of post-processing 843 checkAxisDomainsGridsEligibilityForCompressedOutput(); 1451 checkAxisDomainsGridsEligibilityForCompressedOutput(); 844 1452 845 1453 // Check if some automatic time series should be generated 846 // Warning: This must be done after solving the inheritance and before the rest of post-processing 847 prepareTimeseries(); 848 849 //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers sortir. 850 this->findEnabledFiles(); 851 this->findEnabledReadModeFiles(); 852 853 // Find all enabled fields of each file 854 this->findAllEnabledFields(); 855 this->findAllEnabledFieldsInReadModeFiles(); 856 857 if (hasClient && !hasServer) 858 { 859 // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis) 860 this->readAttributesOfEnabledFieldsInReadModeFiles(); 861 } 1454 // Warning: This must be done after solving the inheritance and before the rest of post-processing 1455 1456 // The timeseries should only be prepared in client 1457 if (hasClient && !hasServer) prepareTimeseries(); 1458 1459 //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir. 1460 findEnabledFiles(); 1461 findEnabledWriteModeFiles(); 1462 findEnabledReadModeFiles(); 1463 1464 // For now, only read files with client and only one level server 1465 // if (hasClient && !hasServer) findEnabledReadModeFiles(); 1466 1467 // Find all enabled fields of each file 1468 findAllEnabledFieldsInFiles(this->enabledWriteModeFiles); 1469 findAllEnabledFieldsInFiles(this->enabledReadModeFiles); 1470 1471 // For now, only read files with client and only one level server 1472 // if (hasClient && !hasServer) 1473 // findAllEnabledFieldsInFiles(this->enabledReadModeFiles); 1474 1475 if (hasClient && !hasServer) 1476 { 1477 initReadFiles(); 1478 // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis) 1479 this->readAttributesOfEnabledFieldsInReadModeFiles(); 1480 } 862 1481 863 1482 // Only search and rebuild all reference objects of enable fields, don't transform 864 1483 this->solveOnlyRefOfEnabledFields(false); 865 1484 866 // Search and rebuild all reference object of enabled fields 867 this->solveAllRefOfEnabledFields (false);1485 // Search and rebuild all reference object of enabled fields, and transform 1486 this->solveAllRefOfEnabledFieldsAndTransform(false); 868 1487 869 1488 // Find all fields with read access from the public API 870 findFieldsWithReadAccess();1489 if (hasClient && !hasServer) findFieldsWithReadAccess(); 871 1490 // and solve the all reference for them 872 solveAllRefOfFieldsWithReadAccess();1491 if (hasClient && !hasServer) solveAllRefOfFieldsWithReadAccess(); 873 1492 874 1493 isPostProcessed = true; … … 877 1496 /*! 878 1497 * Compute the required buffer size to send the attributes (mostly those grid related). 879 *880 1498 * \param maxEventSize [in/out] the size of the bigger event for each connected server 1499 * \param [in] contextClient 1500 * \param [in] bufferForWriting True if buffers are used for sending data for writing 1501 This flag is only true for client and server-1 for communication with server-2 881 1502 */ 882 std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize) 883 { 884 std::map<int, StdSize> attributesSize; 885 886 if (hasClient) 887 { 888 size_t numEnabledFiles = this->enabledFiles.size(); 889 for (size_t i = 0; i < numEnabledFiles; ++i) 890 { 891 CFile* file = this->enabledFiles[i]; 892 1503 std::map<int, StdSize> CContext::getAttributesBufferSize(std::map<int, StdSize>& maxEventSize, 1504 CContextClient* contextClient, bool bufferForWriting /*= "false"*/) 1505 { 1506 // As calendar attributes are sent even if there are no active files or fields, maps are initialized according the size of calendar attributes 1507 std::map<int, StdSize> attributesSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); 1508 maxEventSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); 1509 1510 std::vector<CFile*>& fileList = this->enabledFiles; 1511 size_t numEnabledFiles = fileList.size(); 1512 for (size_t i = 0; i < numEnabledFiles; ++i) 1513 { 1514 // CFile* file = this->enabledWriteModeFiles[i]; 1515 CFile* file = fileList[i]; 1516 std::vector<CField*> enabledFields = file->getEnabledFields(); 1517 size_t numEnabledFields = enabledFields.size(); 1518 for (size_t j = 0; j < numEnabledFields; ++j) 1519 { 1520 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient, bufferForWriting); 1521 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end(); 1522 for (; it != itE; ++it) 1523 { 1524 // If attributesSize[it->first] does not exist, it will be zero-initialized 1525 // so we can use it safely without checking for its existence 1526 if (attributesSize[it->first] < it->second) 1527 attributesSize[it->first] = it->second; 1528 1529 if (maxEventSize[it->first] < it->second) 1530 maxEventSize[it->first] = it->second; 1531 } 1532 } 1533 } 1534 return attributesSize; 1535 } 1536 1537 /*! 1538 * Compute the required buffer size to send the fields data. 1539 * \param maxEventSize [in/out] the size of the bigger event for each connected server 1540 * \param [in] contextClient 1541 * \param [in] bufferForWriting True if buffers are used for sending data for writing 1542 This flag is only true for client and server-1 for communication with server-2 1543 */ 1544 std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize, 1545 CContextClient* contextClient, bool bufferForWriting /*= "false"*/) 1546 { 1547 std::map<int, StdSize> dataSize; 1548 1549 // Find all reference domain and axis of all active fields 1550 std::vector<CFile*>& fileList = bufferForWriting ? this->enabledWriteModeFiles : this->enabledReadModeFiles; 1551 size_t numEnabledFiles = fileList.size(); 1552 for (size_t i = 0; i < numEnabledFiles; ++i) 1553 { 1554 // CFile* file = this->enabledFiles[i]; 1555 CFile* file = fileList[i]; 1556 if (file->getContextClient() == contextClient) 1557 { 893 1558 std::vector<CField*> enabledFields = file->getEnabledFields(); 894 1559 size_t numEnabledFields = enabledFields.size(); 895 1560 for (size_t j = 0; j < numEnabledFields; ++j) 896 1561 { 897 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(); 898 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end(); 899 for (; it != itE; ++it) 900 { 901 // If attributesSize[it->first] does not exist, it will be zero-initialized 902 // so we can use it safely without checking for its existance 903 if (attributesSize[it->first] < it->second) 904 attributesSize[it->first] = it->second; 905 906 if (maxEventSize[it->first] < it->second) 907 maxEventSize[it->first] = it->second; 908 } 909 } 910 } 911 } 912 913 return attributesSize; 914 } 915 916 /*! 917 * Compute the required buffer size to send the fields data. 918 * 919 * \param maxEventSize [in/out] the size of the bigger event for each connected server 920 */ 921 std::map<int, StdSize> CContext::getDataBufferSize(std::map<int, StdSize>& maxEventSize) 922 { 923 CFile::mode_attr::t_enum mode = hasClient ? CFile::mode_attr::write : CFile::mode_attr::read; 924 925 std::map<int, StdSize> dataSize; 926 927 // Find all reference domain and axis of all active fields 928 size_t numEnabledFiles = this->enabledFiles.size(); 929 for (size_t i = 0; i < numEnabledFiles; ++i) 930 { 931 CFile* file = this->enabledFiles[i]; 932 CFile::mode_attr::t_enum fileMode = file->mode.isEmpty() ? CFile::mode_attr::write : file->mode.getValue(); 933 934 if (fileMode == mode) 935 { 936 std::vector<CField*> enabledFields = file->getEnabledFields(); 937 size_t numEnabledFields = enabledFields.size(); 938 for (size_t j = 0; j < numEnabledFields; ++j) 939 { 940 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(); 1562 // const std::vector<std::map<int, StdSize> > mapSize = enabledFields[j]->getGridDataBufferSize(contextClient); 1563 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(contextClient,bufferForWriting); 941 1564 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end(); 942 1565 for (; it != itE; ++it) … … 944 1567 // If dataSize[it->first] does not exist, it will be zero-initialized 945 1568 // so we can use it safely without checking for its existance 946 1569 if (CXios::isOptPerformance) 947 1570 dataSize[it->first] += it->second; 948 1571 else if (dataSize[it->first] < it->second) 949 1572 dataSize[it->first] = it->second; 950 1573 951 1574 if (maxEventSize[it->first] < it->second) 952 1575 maxEventSize[it->first] = it->second; 953 1576 } … … 955 1578 } 956 1579 } 957 958 1580 return dataSize; 959 1581 } 960 1582 961 1583 //! Client side: Send infomation of active files (files are enabled to write out) 962 void CContext::sendEnabledFiles( )963 { 964 int size = this->enabledFiles.size();1584 void CContext::sendEnabledFiles(const std::vector<CFile*>& activeFiles) 1585 { 1586 int size = activeFiles.size(); 965 1587 966 1588 // In a context, each type has a root definition, e.g: axis, domain, field. … … 972 1594 for (int i = 0; i < size; ++i) 973 1595 { 974 cfgrpPtr->sendCreateChild(this->enabledFiles[i]->getId()); 975 this->enabledFiles[i]->sendAllAttributesToServer(); 976 this->enabledFiles[i]->sendAddAllVariables(); 1596 CFile* f = activeFiles[i]; 1597 cfgrpPtr->sendCreateChild(f->getId(),f->getContextClient()); 1598 f->sendAllAttributesToServer(f->getContextClient()); 1599 f->sendAddAllVariables(f->getContextClient()); 977 1600 } 978 1601 } 979 1602 980 1603 //! Client side: Send information of active fields (ones are written onto files) 981 void CContext::sendEnabledFields ()982 { 983 int size = this->enabledFiles.size();1604 void CContext::sendEnabledFieldsInFiles(const std::vector<CFile*>& activeFiles) 1605 { 1606 int size = activeFiles.size(); 984 1607 for (int i = 0; i < size; ++i) 985 1608 { 986 this->enabledFiles[i]->sendEnabledFields();1609 activeFiles[i]->sendEnabledFields(activeFiles[i]->getContextClient()); 987 1610 } 988 1611 } … … 1107 1730 1108 1731 //! Client side: Send information of reference grid of active fields 1109 void CContext::sendRefGrid( )1732 void CContext::sendRefGrid(const std::vector<CFile*>& activeFiles) 1110 1733 { 1111 1734 std::set<StdString> gridIds; 1112 int sizeFile = this->enabledFiles.size();1735 int sizeFile = activeFiles.size(); 1113 1736 CFile* filePtr(NULL); 1114 1737 … … 1116 1739 for (int i = 0; i < sizeFile; ++i) 1117 1740 { 1118 filePtr = this->enabledFiles[i];1741 filePtr = activeFiles[i]; 1119 1742 std::vector<CField*> enabledFields = filePtr->getEnabledFields(); 1120 1743 int sizeField = enabledFields.size(); … … 1140 1763 } 1141 1764 1142 1143 //! Client side: Send information of reference domain and axis of active fields 1144 void CContext::sendRefDomainsAxis() 1765 //! Client side: Send information of reference domain, axis and scalar of active fields 1766 void CContext::sendRefDomainsAxisScalars(const std::vector<CFile*>& activeFiles) 1145 1767 { 1146 1768 std::set<StdString> domainIds, axisIds, scalarIds; 1147 1769 1148 1770 // Find all reference domain and axis of all active fields 1149 int numEnabledFiles = this->enabledFiles.size();1771 int numEnabledFiles = activeFiles.size(); 1150 1772 for (int i = 0; i < numEnabledFiles; ++i) 1151 1773 { 1152 std::vector<CField*> enabledFields = this->enabledFiles[i]->getEnabledFields();1774 std::vector<CField*> enabledFields = activeFiles[i]->getEnabledFields(); 1153 1775 int numEnabledFields = enabledFields.size(); 1154 1776 for (int j = 0; j < numEnabledFields; ++j) … … 1205 1827 void CContext::updateCalendar(int step) 1206 1828 { 1207 #pragma omp critical (_output) 1208 info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl; 1209 calendar->update(step); 1210 #pragma omp critical (_output) 1211 info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl; 1212 #ifdef XIOS_MEMTRACK_LIGHT 1213 #pragma omp critical (_output) 1214 info(50) << " Current memory used by XIOS : "<< MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte, at timestep "<<step<<" of context "<<this->getId()<<endl ; 1215 #endif 1216 if (hasClient) 1829 int prevStep = calendar->getStep(); 1830 1831 if (prevStep < step) 1217 1832 { 1218 doPostTimestepOperationsForEnabledReadModeFiles(); 1219 garbageCollector.invalidate(calendar->getCurrentDate()); 1833 if (hasClient && !hasServer) // For now we only use server level 1 to read data 1834 { 1835 doPreTimestepOperationsForEnabledReadModeFiles(); 1836 } 1837 1838 #pragma omp critical (_output) 1839 info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl; 1840 calendar->update(step); 1841 #pragma omp critical (_output) 1842 info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl; 1843 #ifdef XIOS_MEMTRACK_LIGHT 1844 #pragma omp critical (_output) 1845 info(50) << " Current memory used by XIOS : "<< MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte, at timestep "<<step<<" of context "<<this->getId()<<endl ; 1846 #endif 1847 1848 if (hasClient && !hasServer) // For now we only use server level 1 to read data 1849 { 1850 doPostTimestepOperationsForEnabledReadModeFiles(); 1851 garbageCollector.invalidate(calendar->getCurrentDate()); 1852 } 1220 1853 } 1854 else if (prevStep == step) 1855 { 1856 #pragma omp critical (_output) 1857 info(50) << "updateCalendar: already at step " << step << ", no operation done." << endl; 1858 } 1859 else // if (prevStep > step) 1860 ERROR("void CContext::updateCalendar(int step)", 1861 << "Illegal calendar update: previous step was " << prevStep << ", new step " << step << "is in the past!") 1862 } 1863 1864 void CContext::initReadFiles(void) 1865 { 1866 vector<CFile*>::const_iterator it; 1867 1868 for (it=enabledReadModeFiles.begin(); it != enabledReadModeFiles.end(); it++) 1869 { 1870 (*it)->initRead(); 1871 } 1221 1872 } 1222 1873 1223 1874 //! Server side: Create header of netcdf file 1224 void CContext::createFileHeader(void 1875 void CContext::createFileHeader(void) 1225 1876 { 1226 1877 vector<CFile*>::const_iterator it; 1227 1878 1228 1879 for (it=enabledFiles.begin(); it != enabledFiles.end(); it++) 1880 // for (it=enabledWriteModeFiles.begin(); it != enabledWriteModeFiles.end(); it++) 1229 1881 { 1230 (*it)->init File();1882 (*it)->initWrite(); 1231 1883 } 1232 1884 } … … 1269 1921 return (context); 1270 1922 } 1271 1272 1923 1273 1924 … … 1295 1946 registryOut->hierarchicalGatherRegistry() ; 1296 1947 1297 CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY); 1298 if (client->isServerLeader()) 1948 // Use correct context client to send message 1949 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 1950 for (int i = 0; i < nbSrvPools; ++i) 1299 1951 { 1300 CMessage msg ; 1301 msg<<this->getIdServer(); 1302 if (client->clientRank==0) msg<<*registryOut ; 1303 const std::list<int>& ranks = client->getRanksServerLeader(); 1304 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1305 event.push(*itRank,1,msg); 1306 client->sendEvent(event); 1307 } 1308 else client->sendEvent(event); 1952 CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; 1953 CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY); 1954 if (contextClientTmp->isServerLeader()) 1955 { 1956 CMessage msg ; 1957 if (hasServer) 1958 msg<<this->getIdServer(i); 1959 else 1960 msg<<this->getIdServer(); 1961 if (contextClientTmp->clientRank==0) msg<<*registryOut ; 1962 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 1963 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1964 event.push(*itRank,1,msg); 1965 contextClientTmp->sendEvent(event); 1966 } 1967 else contextClientTmp->sendEvent(event); 1968 } 1309 1969 } 1310 1970 1971 /*! 1972 * \fn bool CContext::isFinalized(void) 1973 * Context is finalized if it received context post finalize event. 1974 */ 1975 bool CContext::isFinalized(void) 1976 { 1977 return finalized; 1978 } 1979 1311 1980 } // namespace xios
Note: See TracChangeset
for help on using the changeset viewer.