Changeset 676 for XIOS/trunk/src/node/axis.cpp
- Timestamp:
- 08/25/15 16:52:45 (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/trunk/src/node/axis.cpp
r670 r676 8 8 #include "context.hpp" 9 9 #include "context_client.hpp" 10 #include "context_server.hpp" 10 11 #include "xios_spl.hpp" 11 12 #include "inverse_axis.hpp" … … 14 15 #include "server_distribution_description.hpp" 15 16 #include "client_server_mapping_distributed.hpp" 17 #include "distribution_client.hpp" 16 18 17 19 namespace xios { … … 22 24 : CObjectTemplate<CAxis>() 23 25 , CAxisAttributes(), isChecked(false), relFiles(), baseRefObject(), areClientAttributesChecked_(false) 24 , isDistributed_(false), hasBounds_(false) 26 , isDistributed_(false), hasBounds_(false), isCompressible_(false) 27 , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0) 25 28 , transformationMap_(), global_zoom_begin(0), global_zoom_size(0) 26 29 { … … 30 33 : CObjectTemplate<CAxis>(id) 31 34 , CAxisAttributes(), isChecked(false), relFiles(), baseRefObject(), areClientAttributesChecked_(false) 32 , isDistributed_(false), hasBounds_(false) 35 , isDistributed_(false), hasBounds_(false), isCompressible_(false) 36 , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0) 33 37 , transformationMap_(), global_zoom_begin(0), global_zoom_size(0) 34 38 { … … 50 54 } 51 55 56 bool CAxis::isWrittenCompressed(const StdString& filename) const 57 { 58 return (this->relFilesCompressed.find(filename) != this->relFilesCompressed.end()); 59 } 60 52 61 bool CAxis::isDistributed(void) const 53 62 { … … 55 64 } 56 65 66 /*! 67 * Test whether the data defined on the axis can be outputted in a compressed way. 68 * 69 * \return true if and only if a mask was defined for this axis 70 */ 71 bool CAxis::isCompressible(void) const 72 { 73 return isCompressible_; 74 } 75 57 76 void CAxis::addRelFile(const StdString & filename) 58 77 { 59 78 this->relFiles.insert(filename); 79 } 80 81 void CAxis::addRelFileCompressed(const StdString& filename) 82 { 83 this->relFilesCompressed.insert(filename); 84 } 85 86 //---------------------------------------------------------------- 87 88 const std::vector<int>& CAxis::getIndexesToWrite(void) const 89 { 90 return indexesToWrite; 91 } 92 93 /*! 94 Returns the number of indexes written by each server. 95 \return the number of indexes written by each server 96 */ 97 int CAxis::getNumberWrittenIndexes() const 98 { 99 return numberWrittenIndexes_; 100 } 101 102 /*! 103 Returns the total number of indexes written by the servers. 104 \return the total number of indexes written by the servers 105 */ 106 int CAxis::getTotalNumberWrittenIndexes() const 107 { 108 return totalNumberWrittenIndexes_; 109 } 110 111 /*! 112 Returns the offset of indexes written by each server. 113 \return the offset of indexes written by each server 114 */ 115 int CAxis::getOffsetWrittenIndexes() const 116 { 117 return offsetWrittenIndexes_; 60 118 } 61 119 … … 171 229 } 172 230 231 void CAxis::checkEligibilityForCompressedOutput() 232 { 233 // We don't check if the mask is valid here, just if a mask has been defined at this point. 234 isCompressible_ = !mask.isEmpty(); 235 } 173 236 174 237 bool CAxis::dispatchEvent(CEventServer& event) … … 277 340 } 278 341 342 std::set<int> writtenInd; 343 if (isCompressible_) 344 { 345 for (int idx = 0; idx < data_index.numElements(); ++idx) 346 { 347 int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni); 348 349 if (ind >= 0 && ind < ni && mask(ind)) 350 { 351 ind += ibegin; 352 if (ind >= global_zoom_begin && ind <= zoom_end) 353 writtenInd.insert(ind); 354 } 355 } 356 } 357 279 358 std::vector<int> nGlobDomain(1); 280 359 nGlobDomain[0] = n_glo.getValue(); … … 303 382 iteVec = (globalAxisZoom).end(); 304 383 indSrv_.clear(); 384 indWrittenSrv_.clear(); 305 385 for (; it != ite; ++it) 306 386 { … … 315 395 indSrv_[rank].push_back(globalIndexTmp[i]); 316 396 } 397 398 if (writtenInd.count(globalIndexTmp[i])) 399 { 400 indWrittenSrv_[rank].push_back(globalIndexTmp[i]); 401 } 317 402 } 318 403 } … … 337 422 CContext* context = CContext::getCurrent(); 338 423 CContextClient* client = context->client; 339 CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE);340 341 int zoom_end = global_zoom_begin +global_zoom_size-1;342 int nb = 0;424 CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE); 425 426 int zoom_end = global_zoom_begin + global_zoom_size - 1; 427 int nb = 0; 343 428 for (size_t idx = 0; idx < n; ++idx) 344 429 { 345 430 size_t globalIndex = begin + idx; 346 431 if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb; 432 } 433 434 int nbWritten = 0; 435 if (isCompressible_) 436 { 437 for (int idx = 0; idx < data_index.numElements(); ++idx) 438 { 439 int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n); 440 441 if (ind >= 0 && ind < n && mask(ind)) 442 { 443 ind += begin; 444 if (ind >= global_zoom_begin && ind <= zoom_end) 445 ++nbWritten; 446 } 447 } 347 448 } 348 449 … … 359 460 } 360 461 462 CArray<int, 1> writtenInd(nbWritten); 463 nbWritten = 0; 464 if (isCompressible_) 465 { 466 for (int idx = 0; idx < data_index.numElements(); ++idx) 467 { 468 int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n); 469 470 if (ind >= 0 && ind < n && mask(ind)) 471 { 472 ind += begin; 473 if (ind >= global_zoom_begin && ind <= zoom_end) 474 { 475 writtenInd(nbWritten) = ind; 476 ++nbWritten; 477 } 478 } 479 } 480 } 481 361 482 if (client->isServerLeader()) 362 483 { … … 366 487 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 367 488 { 368 // Use const int to ensure CMessage holds a copy of the value instead of just a reference369 489 msgs.push_back(CMessage()); 370 490 CMessage& msg = msgs.back(); 371 491 msg << this->getId(); 372 492 msg << val; 373 event.push(*itRank,1,msg); 493 if (isCompressible_) 494 msg << writtenInd; 495 event.push(*itRank, 1, msg); 374 496 } 375 497 client->sendEvent(event); … … 390 512 list<CMessage> list_msgsIndex, list_msgsVal; 391 513 list<CArray<int,1> > list_indi; 514 list<CArray<int,1> > list_writtenInd; 392 515 list<CArray<double,1> > list_val; 393 516 list<CArray<double,2> > list_bounds; … … 433 556 list_msgsIndex.back() << this->getId() << list_indi.back(); 434 557 558 if (isCompressible_) 559 { 560 std::vector<int>& writtenIndSrc = indWrittenSrv_[rank]; 561 list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size())); 562 CArray<int,1>& writtenInd = list_writtenInd.back(); 563 564 for (n = 0; n < writtenInd.numElements(); ++n) 565 writtenInd(n) = writtenIndSrc[n]; 566 567 list_msgsIndex.back() << writtenInd; 568 } 569 435 570 list_msgsVal.push_back(CMessage()); 436 571 list_msgsVal.back() << this->getId() << list_val.back(); … … 451 586 void CAxis::recvIndex(CEventServer& event) 452 587 { 588 CAxis* axis; 589 453 590 list<CEventServer::SSubEvent>::iterator it; 454 591 for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 455 592 { 456 593 CBufferIn* buffer = it->buffer; 457 string domainId; 458 *buffer >> domainId; 459 get(domainId)->recvIndex(it->rank, *buffer); 594 string axisId; 595 *buffer >> axisId; 596 axis = get(axisId); 597 axis->recvIndex(it->rank, *buffer); 598 } 599 600 if (axis->isCompressible_) 601 { 602 std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end()); 603 604 CContextServer* server = CContext::getCurrent()->server; 605 axis->numberWrittenIndexes_ = axis->indexesToWrite.size(); 606 MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm); 607 MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm); 608 axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_; 460 609 } 461 610 } … … 464 613 { 465 614 buffer >> indiSrv_[rank]; 615 616 if (isCompressible_) 617 { 618 CArray<int, 1> writtenIndexes; 619 buffer >> writtenIndexes; 620 indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements()); 621 for (int i = 0; i < writtenIndexes.numElements(); ++i) 622 indexesToWrite.push_back(writtenIndexes(i)); 623 } 466 624 } 467 625 … … 472 630 { 473 631 CBufferIn* buffer = it->buffer; 474 string domainId;475 *buffer >> domainId;476 get( domainId)->recvDistributedValue(it->rank, *buffer);632 string axisId; 633 *buffer >> axisId; 634 get(axisId)->recvDistributedValue(it->rank, *buffer); 477 635 } 478 636 } … … 503 661 void CAxis::recvNonDistributedValue(CEventServer& event) 504 662 { 663 CAxis* axis; 664 505 665 list<CEventServer::SSubEvent>::iterator it; 506 666 for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 507 667 { 508 668 CBufferIn* buffer = it->buffer; 509 string domainId; 510 *buffer >> domainId; 511 get(domainId)->recvNonDistributedValue(it->rank, *buffer); 669 string axisId; 670 *buffer >> axisId; 671 axis = get(axisId); 672 axis->recvNonDistributedValue(it->rank, *buffer); 673 } 674 675 if (axis->isCompressible_) 676 { 677 std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end()); 678 679 axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size(); 680 axis->offsetWrittenIndexes_ = 0; 512 681 } 513 682 } … … 526 695 bound_srv(1,ind) = bounds(1,ind); 527 696 } 697 } 698 699 if (isCompressible_) 700 { 701 CArray<int, 1> writtenIndexes; 702 buffer >> writtenIndexes; 703 indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements()); 704 for (int i = 0; i < writtenIndexes.numElements(); ++i) 705 indexesToWrite.push_back(writtenIndexes(i)); 528 706 } 529 707 } … … 561 739 msg << ni << begin << end; 562 740 msg << global_zoom_begin << global_zoom_size; 741 msg << isCompressible_; 563 742 564 743 event.push(*itRank,1,msg); … … 581 760 int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_size_tmp; 582 761 583 buffer>>ni_srv>>begin_srv>>end_srv>>global_zoom_begin_tmp>>global_zoom_size_tmp; 762 buffer >> ni_srv >> begin_srv >> end_srv; 763 buffer >> global_zoom_begin_tmp >> global_zoom_size_tmp; 764 buffer >> isCompressible_; 584 765 global_zoom_begin = global_zoom_begin_tmp; 585 766 global_zoom_size = global_zoom_size_tmp;
Note: See TracChangeset
for help on using the changeset viewer.