Changeset 1025 for XIOS/dev/dev_olga/src/node/axis.cpp
- Timestamp:
- 01/11/17 15:14:22 (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_olga/src/node/axis.cpp
r1009 r1025 131 131 } 132 132 133 /*! 134 Returns the start of indexes written by each server. 135 \return the start of indexes written by each server 136 */ 137 int CAxis::getStartWriteIndex() const 138 { 139 return start_write_index_; 140 } 141 142 /*! 143 Returns the count of indexes written by each server. 144 \return the count of indexes written by each server 145 */ 146 int CAxis::getCountWriteIndex() const 147 { 148 return count_write_index_; 149 } 150 151 /*! 152 Returns the local data written by each server. 153 */ 154 int CAxis::getLocalWriteSize() const 155 { 156 return local_write_size_; 157 } 158 159 /*! 160 Returns the global data written by all server. 161 */ 162 int CAxis::getGlobalWriteSize() const 163 { 164 return global_write_size_; 165 } 166 133 167 //---------------------------------------------------------------- 134 168 … … 166 200 { 167 201 // size estimation for sendDistributedValue 168 std::map<int, std::vector<size_t> >::const_iterator it, ite = indSrv_.end();202 boost::unordered_map<int, vector<size_t> >::const_iterator it, ite = indSrv_.end(); 169 203 for (it = indSrv_.begin(); it != ite; ++it) 170 204 { … … 292 326 if (global_zoom_begin.isEmpty()) global_zoom_begin.setValue(0); 293 327 if (global_zoom_n.isEmpty()) global_zoom_n.setValue(n_glo.getValue()); 328 if (zoom_index.isEmpty()) 329 { 330 zoom_index.setValue(index.getValue()); 331 } 294 332 } 295 333 … … 342 380 switch(event.type) 343 381 { 344 case EVENT_ID_SERVER_ATTRIBUT : 345 recvServerAttribut(event); 346 return true; 347 break; 348 case EVENT_ID_INDEX: 349 recvIndex(event); 382 // case EVENT_ID_SERVER_ATTRIBUT : 383 // recvServerAttribut(event); 384 // return true; 385 // break; 386 // case EVENT_ID_INDEX: 387 // recvIndex(event); 388 // return true; 389 // break; 390 case EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES: 391 recvNonDistributedAttributes(event); 350 392 return true; 351 393 break; 352 case EVENT_ID_DISTRIBUTED_VALUE: 353 recvDistributedValue(event); 354 return true; 355 break; 356 case EVENT_ID_NON_DISTRIBUTED_VALUE: 357 recvNonDistributedValue(event); 394 case EVENT_ID_DISTRIBUTED_ATTRIBUTES: 395 recvDistributedAttributes(event); 358 396 return true; 359 397 break; … … 383 421 if (context->hasClient) 384 422 { 385 if ( n.getValue() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType);423 if (index.numElements() != n_glo.getValue()) computeConnectedServer(globalDim, orderPositionInGrid, distType); 386 424 } 387 425 … … 398 436 399 437 if (this->isChecked) return; 400 if (context->hasClient) 401 { 402 sendServerAttribut(globalDim, orderPositionInGrid, distType); 403 if (hasValue) sendValue(); 404 } 438 if (context->hasClient) sendAttributes(); 405 439 406 440 this->isChecked = true; 407 441 } 408 442 409 void CAxis::send Value()410 { 411 if ( n.getValue() == n_glo.getValue())412 sendNonDistributed Value();443 void CAxis::sendAttributes() 444 { 445 if (index.numElements() == n_glo.getValue()) 446 sendNonDistributedAttributes(); 413 447 else 414 sendDistributed Value();448 sendDistributedAttributes(); 415 449 } 416 450 … … 419 453 { 420 454 CContext* context = CContext::getCurrent(); 421 CContextClient* client = context->client;455 CContextClient* client = (0 != context->clientPrimServer) ? context->clientPrimServer : context->client; 422 456 int nbServer = client->serverSize; 423 457 int range, clientSize = client->clientSize; 424 458 int rank = client->clientRank; 425 459 426 size_t ni = this->n.getValue();427 size_t ibegin = this->begin.getValue();428 size_t zoom_end = global_zoom_begin+global_zoom_n-1;429 size_t nZoomCount = 0;460 // size_t ni = this->n.getValue(); 461 // size_t ibegin = this->begin.getValue(); 462 // size_t zoom_end = global_zoom_begin+global_zoom_n-1; 463 // size_t nZoomCount = 0; 430 464 size_t nbIndex = index.numElements(); 431 465 for (size_t idx = 0; idx < nbIndex; ++idx) 432 466 { 433 size_t globalIndex = index(idx); 434 if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount; 435 } 436 437 CArray<size_t,1> globalIndexAxis(nbIndex); 438 std::vector<size_t> globalAxisZoom(nZoomCount); 439 nZoomCount = 0; 440 for (size_t idx = 0; idx < nbIndex; ++idx) 441 { 442 size_t globalIndex = index(idx); 443 globalIndexAxis(idx) = globalIndex; 444 if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) 445 { 446 globalAxisZoom[nZoomCount] = globalIndex; 447 ++nZoomCount; 448 } 449 } 450 451 std::set<int> writtenInd; 452 if (isCompressible_) 453 { 454 for (int idx = 0; idx < data_index.numElements(); ++idx) 455 { 456 int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni); 457 458 if (ind >= 0 && ind < ni && mask(ind)) 459 { 460 ind += ibegin; 461 if (ind >= global_zoom_begin && ind <= zoom_end) 462 writtenInd.insert(ind); 463 } 464 } 465 } 466 467 CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer); 467 globalLocalIndexMap_[index(idx)] = idx; 468 // size_t globalIndex = index(idx); 469 // if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount; 470 } 471 472 // CArray<size_t,1> globalIndexAxis(nbIndex); 473 // std::vector<size_t> globalAxisZoom(nZoomCount); 474 // nZoomCount = 0; 475 // for (size_t idx = 0; idx < nbIndex; ++idx) 476 // { 477 // size_t globalIndex = index(idx); 478 // globalIndexAxis(idx) = globalIndex; 479 // if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) 480 // { 481 // globalAxisZoom[nZoomCount] = globalIndex; 482 // ++nZoomCount; 483 // } 484 // } 485 486 // std::set<int> writtenInd; 487 // if (isCompressible_) 488 // { 489 // for (int idx = 0; idx < data_index.numElements(); ++idx) 490 // { 491 // int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni); 492 493 // if (ind >= 0 && ind < ni && mask(ind)) 494 // { 495 // ind += ibegin; 496 // if (ind >= global_zoom_begin && ind <= zoom_end) 497 // writtenInd.insert(ind); 498 // } 499 // } 500 // } 501 502 CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer, distType); 468 503 int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed(); 469 504 CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer; … … 493 528 } 494 529 530 CArray<size_t,1> globalIndex(index.numElements()); 531 for (size_t idx = 0; idx < globalIndex.numElements(); ++idx) 532 globalIndex(idx) = index(idx); 533 495 534 CServerDistributionDescription serverDescription(nGlobAxis, nbServer); 496 535 serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd)); 497 536 CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm); 498 clientServerMap->computeServerIndexMapping(globalIndex Axis);537 clientServerMap->computeServerIndexMapping(globalIndex); 499 538 globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer(); 500 539 delete clientServerMap; … … 514 553 } 515 554 516 CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(), 517 ite = globalIndexAxisOnServer.end(); 518 std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(), 519 iteVec = (globalAxisZoom).end(); 520 indSrv_.clear(); 521 indWrittenSrv_.clear(); 522 for (; it != ite; ++it) 523 { 524 int rank = it->first; 525 const std::vector<size_t>& globalIndexTmp = it->second; 526 int nb = globalIndexTmp.size(); 527 528 for (int i = 0; i < nb; ++i) 529 { 530 if (std::binary_search(itbVec, iteVec, globalIndexTmp[i])) 531 { 532 indSrv_[rank].push_back(globalIndexTmp[i]); 533 } 534 535 if (writtenInd.count(globalIndexTmp[i])) 536 { 537 indWrittenSrv_[rank].push_back(globalIndexTmp[i]); 538 } 539 } 540 } 555 indSrv_.swap(globalIndexAxisOnServer); 556 557 // CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(), 558 // ite = globalIndexAxisOnServer.end(); 559 CClientServerMapping::GlobalIndexMap::const_iterator it = indSrv_.begin(), 560 ite = indSrv_.end(); 561 // std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(), 562 // iteVec = (globalAxisZoom).end(); 563 // indSrv_.clear(); 564 // indWrittenSrv_.clear(); 565 // for (; it != ite; ++it) 566 // { 567 // int rank = it->first; 568 // const std::vector<size_t>& globalIndexTmp = it->second; 569 // int nb = globalIndexTmp.size(); 570 571 // for (int i = 0; i < nb; ++i) 572 // { 573 // if (std::binary_search(itbVec, iteVec, globalIndexTmp[i])) 574 // { 575 // indSrv_[rank].push_back(globalIndexTmp[i]); 576 // } 577 578 // if (writtenInd.count(globalIndexTmp[i])) 579 // { 580 // indWrittenSrv_[rank].push_back(globalIndexTmp[i]); 581 // } 582 // } 583 // } 541 584 542 585 connectedServerRank_.clear(); 543 for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) {586 for (it = indSrv_.begin(); it != ite; ++it) { 544 587 connectedServerRank_.push_back(it->first); 545 588 } 546 589 547 if (!indSrv_.empty())548 {549 std::map<int, vector<size_t> >::const_iterator itIndSrv = indSrv_.begin(),550 iteIndSrv = indSrv_.end();551 connectedServerRank_.clear();552 for (; itIndSrv != iteIndSrv; ++itIndSrv)553 connectedServerRank_.push_back(itIndSrv->first);554 }590 // if (!indSrv_.empty()) 591 // { 592 // std::map<int, vector<size_t> >::const_iterator itIndSrv = indSrv_.begin(), 593 // iteIndSrv = indSrv_.end(); 594 // connectedServerRank_.clear(); 595 // for (; itIndSrv != iteIndSrv; ++itIndSrv) 596 // connectedServerRank_.push_back(itIndSrv->first); 597 // } 555 598 nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_); 556 599 } 557 600 558 void CAxis::sendNonDistributedValue() 601 602 // void CAxis::computeConnectedServer(const std::vector<int>& globalDim, int orderPositionInGrid, 603 // CServerDistributionDescription::ServerDistributionType distType) 604 // { 605 // CContext* context = CContext::getCurrent(); 606 // CContextClient* client = (0 != context->clientPrimServer) ? context->clientPrimServer : context->client; 607 // int nbServer = client->serverSize; 608 // int range, clientSize = client->clientSize; 609 // int rank = client->clientRank; 610 611 // size_t ni = this->n.getValue(); 612 // size_t ibegin = this->begin.getValue(); 613 // size_t zoom_end = global_zoom_begin+global_zoom_n-1; 614 // size_t nZoomCount = 0; 615 // size_t nbIndex = index.numElements(); 616 // for (size_t idx = 0; idx < nbIndex; ++idx) 617 // { 618 // size_t globalIndex = index(idx); 619 // if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nZoomCount; 620 // } 621 622 // CArray<size_t,1> globalIndexAxis(nbIndex); 623 // std::vector<size_t> globalAxisZoom(nZoomCount); 624 // nZoomCount = 0; 625 // for (size_t idx = 0; idx < nbIndex; ++idx) 626 // { 627 // size_t globalIndex = index(idx); 628 // globalIndexAxis(idx) = globalIndex; 629 // if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) 630 // { 631 // globalAxisZoom[nZoomCount] = globalIndex; 632 // ++nZoomCount; 633 // } 634 // } 635 636 // std::set<int> writtenInd; 637 // if (isCompressible_) 638 // { 639 // for (int idx = 0; idx < data_index.numElements(); ++idx) 640 // { 641 // int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni); 642 643 // if (ind >= 0 && ind < ni && mask(ind)) 644 // { 645 // ind += ibegin; 646 // if (ind >= global_zoom_begin && ind <= zoom_end) 647 // writtenInd.insert(ind); 648 // } 649 // } 650 // } 651 652 // CServerDistributionDescription serverDescriptionGlobal(globalDim, nbServer); 653 // int distributedDimensionOnServer = serverDescriptionGlobal.getDimensionDistributed(); 654 // CClientServerMapping::GlobalIndexMap globalIndexAxisOnServer; 655 // if (distributedDimensionOnServer == orderPositionInGrid) // So we have distributed axis on client side and also on server side* 656 // { 657 // std::vector<int> nGlobAxis(1); 658 // nGlobAxis[0] = n_glo.getValue(); 659 660 // size_t globalSizeIndex = 1, indexBegin, indexEnd; 661 // for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i]; 662 // indexBegin = 0; 663 // if (globalSizeIndex <= clientSize) 664 // { 665 // indexBegin = rank%globalSizeIndex; 666 // indexEnd = indexBegin; 667 // } 668 // else 669 // { 670 // for (int i = 0; i < clientSize; ++i) 671 // { 672 // range = globalSizeIndex / clientSize; 673 // if (i < (globalSizeIndex%clientSize)) ++range; 674 // if (i == client->clientRank) break; 675 // indexBegin += range; 676 // } 677 // indexEnd = indexBegin + range - 1; 678 // } 679 680 // CServerDistributionDescription serverDescription(nGlobAxis, nbServer); 681 // serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t,size_t>(indexBegin, indexEnd)); 682 // CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm); 683 // clientServerMap->computeServerIndexMapping(globalIndexAxis); 684 // globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer(); 685 // delete clientServerMap; 686 // } 687 // else 688 // { 689 // std::vector<size_t> globalIndexServer(n_glo.getValue()); 690 // for (size_t idx = 0; idx < n_glo.getValue(); ++idx) 691 // { 692 // globalIndexServer[idx] = idx; 693 // } 694 695 // for (int idx = 0; idx < nbServer; ++idx) 696 // { 697 // globalIndexAxisOnServer[idx] = globalIndexServer; 698 // } 699 // } 700 701 // CClientServerMapping::GlobalIndexMap::const_iterator it = globalIndexAxisOnServer.begin(), 702 // ite = globalIndexAxisOnServer.end(); 703 // std::vector<size_t>::const_iterator itbVec = (globalAxisZoom).begin(), 704 // iteVec = (globalAxisZoom).end(); 705 // indSrv_.clear(); 706 // indWrittenSrv_.clear(); 707 // for (; it != ite; ++it) 708 // { 709 // int rank = it->first; 710 // const std::vector<size_t>& globalIndexTmp = it->second; 711 // int nb = globalIndexTmp.size(); 712 713 // for (int i = 0; i < nb; ++i) 714 // { 715 // if (std::binary_search(itbVec, iteVec, globalIndexTmp[i])) 716 // { 717 // indSrv_[rank].push_back(globalIndexTmp[i]); 718 // } 719 720 // if (writtenInd.count(globalIndexTmp[i])) 721 // { 722 // indWrittenSrv_[rank].push_back(globalIndexTmp[i]); 723 // } 724 // } 725 // } 726 727 // connectedServerRank_.clear(); 728 // for (it = globalIndexAxisOnServer.begin(); it != ite; ++it) { 729 // connectedServerRank_.push_back(it->first); 730 // } 731 732 // if (!indSrv_.empty()) 733 // { 734 // std::map<int, vector<size_t> >::const_iterator itIndSrv = indSrv_.begin(), 735 // iteIndSrv = indSrv_.end(); 736 // connectedServerRank_.clear(); 737 // for (; itIndSrv != iteIndSrv; ++itIndSrv) 738 // connectedServerRank_.push_back(itIndSrv->first); 739 // } 740 // nbConnectedClients_ = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_); 741 // } 742 743 void CAxis::sendNonDistributedAttributes() 559 744 { 560 745 CContext* context = CContext::getCurrent(); 561 CContextClient* client = context->client; 562 CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_VALUE); 563 564 int zoom_end = global_zoom_begin + global_zoom_n - 1; 565 int nb = 0; 566 for (size_t idx = 0; idx < n; ++idx) 567 { 568 size_t globalIndex = begin + idx; 569 if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) ++nb; 570 } 571 572 int nbWritten = 0; 573 if (isCompressible_) 574 { 575 for (int idx = 0; idx < data_index.numElements(); ++idx) 576 { 577 int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n); 578 579 if (ind >= 0 && ind < n && mask(ind)) 580 { 581 ind += begin; 582 if (ind >= global_zoom_begin && ind <= zoom_end) 583 ++nbWritten; 584 } 585 } 586 } 587 588 CArray<double,1> val(nb); 589 nb = 0; 590 for (size_t idx = 0; idx < n; ++idx) 591 { 592 size_t globalIndex = begin + idx; 593 if (globalIndex >= global_zoom_begin && globalIndex <= zoom_end) 594 { 595 val(nb) = value(idx); 596 ++nb; 597 } 598 } 599 600 CArray<int, 1> writtenInd(nbWritten); 601 nbWritten = 0; 602 if (isCompressible_) 603 { 604 for (int idx = 0; idx < data_index.numElements(); ++idx) 605 { 606 int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, n); 607 608 if (ind >= 0 && ind < n && mask(ind)) 609 { 610 ind += begin; 611 if (ind >= global_zoom_begin && ind <= zoom_end) 612 { 613 writtenInd(nbWritten) = ind; 614 ++nbWritten; 615 } 616 } 617 } 746 CContextClient* client = (0 != context->clientPrimServer) ? context->clientPrimServer : context->client; 747 748 CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES); 749 size_t nbIndex = index.numElements(); 750 size_t nbDataIndex = 0; 751 752 for (int idx = 0; idx < data_index.numElements(); ++idx) 753 { 754 int ind = data_index(idx); 755 if (ind >= 0 && ind < nbIndex) ++nbDataIndex; 756 } 757 758 CArray<int,1> dataIndex(nbDataIndex); 759 nbDataIndex = 0; 760 for (int idx = 0; idx < data_index.numElements(); ++idx) 761 { 762 int ind = data_index(idx); 763 if (ind >= 0 && ind < nbIndex) 764 { 765 dataIndex(nbDataIndex) = ind; 766 ++nbDataIndex; 767 } 618 768 } 619 769 620 770 if (client->isServerLeader()) 621 { 771 { 622 772 std::list<CMessage> msgs; 623 773 … … 628 778 CMessage& msg = msgs.back(); 629 779 msg << this->getId(); 630 msg << val; 631 if (isCompressible_) 632 msg << writtenInd; 780 msg << index.getValue() << dataIndex << zoom_index.getValue() << mask.getValue(); 781 msg << hasValue; 782 if (hasValue) msg << value.getValue(); 783 784 msg << hasBounds_; 785 if (hasBounds_) msg << bounds.getValue(); 786 633 787 event.push(*itRank, 1, msg); 634 788 } … … 638 792 } 639 793 640 void CAxis::sendDistributedValue(void) 794 void CAxis::recvNonDistributedAttributes(CEventServer& event) 795 { 796 list<CEventServer::SSubEvent>::iterator it; 797 for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 798 { 799 CBufferIn* buffer = it->buffer; 800 string axisId; 801 *buffer >> axisId; 802 get(axisId)->recvNonDistributedAttributes(it->rank, *buffer); 803 } 804 } 805 806 void CAxis::recvNonDistributedAttributes(int rank, CBufferIn& buffer) 807 { 808 CArray<int,1> tmp_index, tmp_data_index, tmp_zoom_index; 809 CArray<bool,1> tmp_mask; 810 CArray<double,1> tmp_val; 811 CArray<double,2> tmp_bnds; 812 813 buffer >> tmp_index; 814 index.reference(tmp_index); 815 buffer >> tmp_data_index; 816 data_index.reference(tmp_data_index); 817 buffer >> tmp_zoom_index; 818 zoom_index.reference(tmp_zoom_index); 819 buffer >> tmp_mask; 820 mask.reference(tmp_mask); 821 buffer >> hasValue; 822 if (hasValue) 823 { 824 buffer >> tmp_val; 825 value.reference(tmp_val); 826 } 827 828 buffer >> hasBounds_; 829 if (hasBounds_) 830 { 831 buffer >> tmp_bnds; 832 bounds.reference(tmp_bnds); 833 } 834 835 { 836 count_write_index_ = zoom_index.numElements(); 837 start_write_index_ = 0; 838 local_write_size_ = count_write_index_; 839 global_write_size_ = count_write_index_; 840 } 841 } 842 843 void CAxis::sendDistributedAttributes(void) 641 844 { 642 845 int ns, n, i, j, ind, nv, idx; 643 846 CContext* context = CContext::getCurrent(); 644 CContextClient* client=context->client; 645 646 // send value for each connected server 647 CEventClient eventIndex(getType(), EVENT_ID_INDEX); 648 CEventClient eventVal(getType(), EVENT_ID_DISTRIBUTED_VALUE); 649 650 list<CMessage> list_msgsIndex, list_msgsVal; 651 list<CArray<int,1> > list_indi; 652 list<CArray<int,1> > list_writtenInd; 847 CContextClient* client = (0 != context->clientPrimServer) ? context->clientPrimServer : context->client; 848 849 CEventClient eventData(getType(), EVENT_ID_DISTRIBUTED_ATTRIBUTES); 850 851 list<CMessage> listData; 852 list<CArray<int,1> > list_indi, list_dataInd, list_zoomInd; 853 list<CArray<bool,1> > list_mask; 653 854 list<CArray<double,1> > list_val; 654 855 list<CArray<double,2> > list_bounds; 655 856 656 std::map<int, std::vector<size_t> >::const_iterator it, iteMap; 857 int nbIndex = index.numElements(); 858 CArray<int,1> dataIndex(nbIndex); 859 dataIndex = -1; 860 for (int idx = 0; idx < data_index.numElements(); ++idx) 861 { 862 if (0 <= data_index(idx) && data_index(idx) < nbIndex) 863 dataIndex(idx) = data_index(idx); 864 } 865 866 boost::unordered_map<int, std::vector<size_t> >::const_iterator it, iteMap; 657 867 iteMap = indSrv_.end(); 658 868 for (int k = 0; k < connectedServerRank_.size(); ++k) … … 660 870 int nbData = 0; 661 871 int rank = connectedServerRank_[k]; 872 int nbSendingClient = nbConnectedClients_[rank]; 662 873 it = indSrv_.find(rank); 663 874 if (iteMap != it) … … 665 876 666 877 list_indi.push_back(CArray<int,1>(nbData)); 667 list_val.push_back(CArray<double,1>(nbData)); 878 list_dataInd.push_back(CArray<int,1>(nbData)); 879 list_zoomInd.push_back(CArray<int,1>(nbData)); 880 list_mask.push_back(CArray<bool,1>(nbData)); 881 882 883 if (hasValue) 884 list_val.push_back(CArray<double,1>(nbData)); 668 885 669 886 if (hasBounds_) … … 673 890 674 891 CArray<int,1>& indi = list_indi.back(); 675 CArray<double,1>& val = list_val.back(); 892 CArray<int,1>& dataIndi = list_dataInd.back(); 893 CArray<int,1>& zoomIndi = list_zoomInd.back(); 894 CArray<bool,1>& maskIndi = list_mask.back(); 676 895 677 896 for (n = 0; n < nbData; ++n) 678 897 { 679 898 idx = static_cast<int>(it->second[n]); 680 ind = idx - begin;681 682 val(n) = value(ind);683 899 indi(n) = idx; 900 901 ind = globalLocalIndexMap_[idx]; 902 dataIndi(n) = dataIndex(ind); 903 maskIndi(n) = mask(ind); 904 zoomIndi(n) = zoom_index(ind); 905 906 if (hasValue) 907 { 908 CArray<double,1>& val = list_val.back(); 909 val(n) = value(ind); 910 } 684 911 685 912 if (hasBounds_) … … 691 918 } 692 919 693 list_msgsIndex.push_back(CMessage()); 694 list_msgsIndex.back() << this->getId() << list_indi.back(); 695 696 if (isCompressible_) 697 { 698 std::vector<int>& writtenIndSrc = indWrittenSrv_[rank]; 699 list_writtenInd.push_back(CArray<int,1>(writtenIndSrc.size())); 700 CArray<int,1>& writtenInd = list_writtenInd.back(); 701 702 for (n = 0; n < writtenInd.numElements(); ++n) 703 writtenInd(n) = writtenIndSrc[n]; 704 705 list_msgsIndex.back() << writtenInd; 706 } 707 708 list_msgsVal.push_back(CMessage()); 709 list_msgsVal.back() << this->getId() << list_val.back(); 710 920 listData.push_back(CMessage()); 921 listData.back() << this->getId() 922 << list_indi.back() << list_dataInd.back() << list_zoomInd.back() << list_mask.back() 923 << hasValue; 924 if (hasValue) 925 listData.back() << list_val.back(); 926 listData.back() << hasBounds_; 711 927 if (hasBounds_) 712 { 713 list_msgsVal.back() << list_bounds.back(); 714 } 715 716 eventIndex.push(rank, nbConnectedClients_[rank], list_msgsIndex.back()); 717 eventVal.push(rank, nbConnectedClients_[rank], list_msgsVal.back()); 718 } 719 720 client->sendEvent(eventIndex); 721 client->sendEvent(eventVal); 722 } 723 724 void CAxis::recvIndex(CEventServer& event) 725 { 726 CAxis* axis; 928 listData.back() << list_bounds.back(); 929 930 eventData.push(rank, nbConnectedClients_[rank], listData.back()); 931 } 932 933 client->sendEvent(eventData); 934 } 935 936 void CAxis::recvDistributedAttributes(CEventServer& event) 937 { 938 string axisId; 939 vector<int> ranks; 940 vector<CBufferIn*> buffers; 727 941 728 942 list<CEventServer::SSubEvent>::iterator it; 729 943 for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 730 944 { 945 ranks.push_back(it->rank); 731 946 CBufferIn* buffer = it->buffer; 732 string axisId;733 947 *buffer >> axisId; 734 axis = get(axisId); 735 axis->recvIndex(it->rank, *buffer); 736 } 737 738 if (axis->isCompressible_) 739 { 740 std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end()); 741 948 buffers.push_back(buffer); 949 } 950 get(axisId)->recvDistributedAttributes(ranks, buffers); 951 } 952 953 void CAxis::recvDistributedAttributes(vector<int>& ranks, vector<CBufferIn*> buffers) 954 { 955 int nbReceived = ranks.size(); 956 vector<CArray<int,1> > vec_indi(nbReceived), vec_dataInd(nbReceived), vec_zoomInd(nbReceived); 957 vector<CArray<bool,1> > vec_mask(nbReceived); 958 vector<CArray<double,1> > vec_val(nbReceived); 959 vector<CArray<double,2> > vec_bounds(nbReceived); 960 961 for (int idx = 0; idx < nbReceived; ++idx) 962 { 963 CBufferIn& buffer = *buffers[idx]; 964 buffer >> vec_indi[idx]; 965 buffer >> vec_dataInd[idx]; 966 buffer >> vec_zoomInd[idx]; 967 buffer >> vec_mask[idx]; 968 969 buffer >> hasValue; 970 if (hasValue) 971 buffer >> vec_val[idx]; 972 buffer >> hasBounds_; 973 if (hasBounds_) 974 buffer >> vec_bounds[idx]; 975 } 976 977 int nbData = 0; 978 for (int idx = 0; idx < nbReceived; ++idx) 979 { 980 nbData += vec_indi[idx].numElements(); 981 } 982 983 index.resize(nbData); 984 CArray<int,1> nonCompressedData(nbData); 985 mask.resize(nbData); 986 if (hasValue) 987 value.resize(nbData); 988 if (hasBounds_) 989 bounds.resize(2,nbData); 990 991 nbData = 0; 992 for (int idx = 0; idx < nbReceived; ++idx) 993 { 994 CArray<int,1>& indi = vec_indi[idx]; 995 CArray<int,1>& dataIndi = vec_dataInd[idx]; 996 CArray<bool,1>& maskIndi = vec_mask[idx]; 997 int nb = indi.numElements(); 998 for (int n = 0; n < nb; ++n) 999 { 1000 index(nbData) = indi(n); 1001 nonCompressedData(nbData) = (0 <= dataIndi(n)) ? nbData : -1; 1002 mask(nbData) = maskIndi(n); 1003 if (hasValue) 1004 value(nbData) = vec_val[idx](n); 1005 if (hasBounds_) 1006 { 1007 bounds(0,nbData) = vec_bounds[idx](0,n); 1008 bounds(1,nbData) = vec_bounds[idx](1,n); 1009 } 1010 ++nbData; 1011 } 1012 } 1013 1014 int nbIndex = index.numElements(); 1015 int nbCompressedData = 0; 1016 for (int idx = 0; idx < nonCompressedData.numElements(); ++idx) 1017 { 1018 if (0 <= nonCompressedData(idx) && nonCompressedData(idx) < nbIndex) 1019 ++nbCompressedData; 1020 } 1021 1022 data_index.resize(nbCompressedData); 1023 nbCompressedData = 0; 1024 for (int idx = 0; idx < nonCompressedData.numElements(); ++idx) 1025 { 1026 if (0 <= nonCompressedData(idx) && nonCompressedData(idx) < nbIndex) 1027 { 1028 data_index(nbCompressedData) = nonCompressedData(idx); 1029 ++nbCompressedData; 1030 } 1031 } 1032 1033 int nbZoomIndex = 0; 1034 for (int idx = 0; idx < nbReceived; ++idx) 1035 { 1036 nbZoomIndex += vec_zoomInd[idx].numElements(); 1037 } 1038 1039 zoom_index.resize(nbZoomIndex); 1040 nbZoomIndex = 0; 1041 CArray<int,1>& zoom_Index_Tmp = this->zoom_index; 1042 for (int idx = 0; idx < nbReceived; ++idx) 1043 { 1044 CArray<int,1> tmp = zoom_Index_Tmp(Range(nbZoomIndex, nbZoomIndex + vec_zoomInd[idx].numElements()-1)); 1045 tmp = vec_zoomInd[idx]; 1046 1047 nbZoomIndex += vec_zoomInd[idx].numElements(); 1048 } 1049 1050 1051 { 742 1052 CContextServer* server = CContext::getCurrent()->server; 743 axis->numberWrittenIndexes_ = axis->indexesToWrite.size(); 744 MPI_Allreduce(&axis->numberWrittenIndexes_, &axis->totalNumberWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm); 745 MPI_Scan(&axis->numberWrittenIndexes_, &axis->offsetWrittenIndexes_, 1, MPI_INT, MPI_SUM, server->intraComm); 746 axis->offsetWrittenIndexes_ -= axis->numberWrittenIndexes_; 747 } 748 } 749 750 void CAxis::recvIndex(int rank, CBufferIn& buffer) 751 { 752 buffer >> indiSrv_[rank]; 753 754 if (isCompressible_) 755 { 756 CArray<int, 1> writtenIndexes; 757 buffer >> writtenIndexes; 758 indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements()); 759 for (int i = 0; i < writtenIndexes.numElements(); ++i) 760 indexesToWrite.push_back(writtenIndexes(i)); 761 } 762 } 763 764 void CAxis::recvDistributedValue(CEventServer& event) 765 { 766 list<CEventServer::SSubEvent>::iterator it; 767 for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 768 { 769 CBufferIn* buffer = it->buffer; 770 string axisId; 771 *buffer >> axisId; 772 get(axisId)->recvDistributedValue(it->rank, *buffer); 773 } 774 } 775 776 void CAxis::recvDistributedValue(int rank, CBufferIn& buffer) 777 { 778 CArray<int,1> &indi = indiSrv_[rank]; 779 CArray<double,1> val; 780 CArray<double,2> boundsVal; 781 782 buffer >> val; 783 if (hasBounds_) buffer >> boundsVal; 784 785 int i, j, ind_srv; 786 for (int ind = 0; ind < indi.numElements(); ++ind) 787 { 788 i = indi(ind); 789 ind_srv = i - zoom_begin_srv; 790 value_srv(ind_srv) = val(ind); 791 if (hasBounds_) 792 { 793 bound_srv(0,ind_srv) = boundsVal(0, ind); 794 bound_srv(1,ind_srv) = boundsVal(1, ind); 795 } 796 } 797 } 798 799 void CAxis::recvNonDistributedValue(CEventServer& event) 800 { 801 CAxis* axis; 802 803 list<CEventServer::SSubEvent>::iterator it; 804 for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 805 { 806 CBufferIn* buffer = it->buffer; 807 string axisId; 808 *buffer >> axisId; 809 axis = get(axisId); 810 axis->recvNonDistributedValue(it->rank, *buffer); 811 } 812 813 if (axis->isCompressible_) 814 { 815 std::sort(axis->indexesToWrite.begin(), axis->indexesToWrite.end()); 816 817 axis->numberWrittenIndexes_ = axis->totalNumberWrittenIndexes_ = axis->indexesToWrite.size(); 818 axis->offsetWrittenIndexes_ = 0; 819 } 820 } 821 822 void CAxis::recvNonDistributedValue(int rank, CBufferIn& buffer) 823 { 824 CArray<double,1> val; 825 buffer >> val; 826 827 for (int ind = 0; ind < val.numElements(); ++ind) 828 { 829 value_srv(ind) = val(ind); 830 if (hasBounds_) 831 { 832 bound_srv(0,ind) = bounds(0,ind); 833 bound_srv(1,ind) = bounds(1,ind); 834 } 835 } 836 837 if (isCompressible_) 838 { 839 CArray<int, 1> writtenIndexes; 840 buffer >> writtenIndexes; 841 indexesToWrite.reserve(indexesToWrite.size() + writtenIndexes.numElements()); 842 for (int i = 0; i < writtenIndexes.numElements(); ++i) 843 indexesToWrite.push_back(writtenIndexes(i)); 1053 count_write_index_ = zoom_index.numElements(); 1054 MPI_Scan(&count_write_index_, &start_write_index_, 1, MPI_INT, MPI_SUM, server->intraComm); 1055 global_write_size_ = start_write_index_; 1056 start_write_index_ -= count_write_index_; 1057 local_write_size_ = count_write_index_; 844 1058 } 845 1059 } … … 849 1063 // { 850 1064 // CContext* context = CContext::getCurrent(); 851 // CContextClient* client = context->client; 852 // int nbServer = client->serverSize; 1065 1066 // CContextClient* contextClientTmp = (0 != context->clientPrimServer) ? context->clientPrimServer 1067 // : context->client; 1068 1069 1070 // int nbServer = contextClientTmp->serverSize; 853 1071 854 1072 // CServerDistributionDescription serverDescription(globalDim, nbServer); … … 858 1076 // std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes(); 859 1077 1078 // globalDimGrid.resize(globalDim.size()); 1079 // for (int idx = 0; idx < globalDim.size(); ++idx) globalDimGrid(idx) = globalDim[idx]; 1080 860 1081 // CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT); 861 // if (c lient->isServerLeader())1082 // if (contextClientTmp->isServerLeader()) 862 1083 // { 863 1084 // std::list<CMessage> msgs; 864 1085 865 // const std::list<int>& ranks = c lient->getRanksServerLeader();1086 // const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 866 1087 // for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 867 1088 // { … … 877 1098 // msg << global_zoom_begin.getValue() << global_zoom_n.getValue(); 878 1099 // msg << isCompressible_; 1100 // msg << orderPositionInGrid; 1101 // msg << globalDimGrid; 879 1102 880 1103 // event.push(*itRank,1,msg); 881 1104 // } 882 // c lient->sendEvent(event);1105 // contextClientTmp->sendEvent(event); 883 1106 // } 884 // else c lient->sendEvent(event);1107 // else contextClientTmp->sendEvent(event); 885 1108 // } 886 1109 887 void CAxis::sendServerAttribut(const std::vector<int>& globalDim, int orderPositionInGrid,888 CServerDistributionDescription::ServerDistributionType distType)889 {890 CContext* context = CContext::getCurrent();891 int nbSrvPools = (context->hasServer) ? context->clientPrimServer.size() : 1;892 for (int i = 0; i < nbSrvPools; ++i)1110 // void CAxis::recvServerAttribut(CEventServer& event) 1111 // { 1112 // CBufferIn* buffer = event.subEvents.begin()->buffer; 1113 // string axisId; 1114 // *buffer >> axisId; 1115 // get(axisId)->recvServerAttribut(*buffer); 893 1116 { 894 1117 CContextClient* contextClientTmp = (context->hasServer) ? context->clientPrimServer[i] 895 : context->client; 896 int nbServer = contextClientTmp->serverSize; 897 898 CServerDistributionDescription serverDescription(globalDim, nbServer); 899 serverDescription.computeServerDistribution(); 900 901 std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin(); 902 std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes(); 903 904 globalDimGrid.resize(globalDim.size()); 905 for (int idx = 0; idx < globalDim.size(); ++idx) globalDimGrid(idx) = globalDim[idx]; 906 907 CEventClient event(getType(),EVENT_ID_SERVER_ATTRIBUT); 908 if (contextClientTmp->isServerLeader()) 909 { 910 std::list<CMessage> msgs; 911 912 const std::list<int>& ranks = contextClientTmp->getRanksServerLeader(); 913 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 914 { 915 // Use const int to ensure CMessage holds a copy of the value instead of just a reference 916 const int begin = serverIndexBegin[*itRank][orderPositionInGrid]; 917 const int ni = serverDimensionSizes[*itRank][orderPositionInGrid]; 918 const int end = begin + ni - 1; 919 920 msgs.push_back(CMessage()); 921 CMessage& msg = msgs.back(); 922 msg << this->getId(); 923 msg << ni << begin << end; 924 msg << global_zoom_begin.getValue() << global_zoom_n.getValue(); 925 msg << isCompressible_; 926 msg << orderPositionInGrid; 927 msg << globalDimGrid; 928 929 event.push(*itRank,1,msg); 930 } 931 contextClientTmp->sendEvent(event); 932 } 933 else contextClientTmp->sendEvent(event); 934 } 935 } 936 937 void CAxis::recvServerAttribut(CEventServer& event) 938 { 939 CBufferIn* buffer = event.subEvents.begin()->buffer; 940 string axisId; 941 *buffer >> axisId; 942 get(axisId)->recvServerAttribut(*buffer); 943 944 CContext* context = CContext::getCurrent(); 945 if (context->hasClient && context->hasServer) 946 { 947 std::vector<int> globalDim(get(axisId)->globalDimGrid.numElements()); 948 for (int idx = 0; idx < globalDim.size(); ++idx) globalDim[idx] = get(axisId)->globalDimGrid(idx); 949 get(axisId)->sendServerAttribut(globalDim, get(axisId)->orderPosInGrid, 950 CServerDistributionDescription::BAND_DISTRIBUTION); 951 } 952 } 953 954 void CAxis::recvServerAttribut(CBufferIn& buffer) 955 { 956 int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp; 957 958 buffer >> ni_srv >> begin_srv >> end_srv; 959 buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp; 960 buffer >> isCompressible_; 961 buffer >> orderPosInGrid; 962 buffer >> globalDimGrid; 963 964 global_zoom_begin = global_zoom_begin_tmp; 965 global_zoom_n = global_zoom_n_tmp; 966 int global_zoom_end = global_zoom_begin + global_zoom_n - 1; 967 968 zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ; 969 zoom_end_srv = global_zoom_end < end_srv ? global_zoom_end : end_srv ; 970 zoom_size_srv = zoom_end_srv - zoom_begin_srv + 1; 971 972 if (zoom_size_srv<=0) 973 { 974 zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0; 975 } 976 977 if (n_glo == n) 978 { 979 zoom_begin_srv = global_zoom_begin; 980 zoom_end_srv = global_zoom_end; //zoom_end; 981 zoom_size_srv = zoom_end_srv - zoom_begin_srv + 1; 982 } 983 if (hasValue) 984 { 985 value_srv.resize(zoom_size_srv); 986 if (hasBounds_) bound_srv.resize(2,zoom_size_srv); 987 } 988 } 1118 } 1119 1120 // CContext* context = CContext::getCurrent(); 1121 // if (context->hasClient && context->hasServer) 1122 // { 1123 // std::vector<int> globalDim(get(axisId)->globalDimGrid.numElements()); 1124 // for (int idx = 0; idx < globalDim.size(); ++idx) globalDim[idx] = get(axisId)->globalDimGrid(idx); 1125 // get(axisId)->sendServerAttribut(globalDim, get(axisId)->orderPosInGrid, 1126 // CServerDistributionDescription::BAND_DISTRIBUTION); 1127 // } 1128 // } 1129 1130 // void CAxis::recvServerAttribut(CBufferIn& buffer) 1131 // { 1132 // int ni_srv, begin_srv, end_srv, global_zoom_begin_tmp, global_zoom_n_tmp; 1133 1134 // buffer >> ni_srv >> begin_srv >> end_srv; 1135 // buffer >> global_zoom_begin_tmp >> global_zoom_n_tmp; 1136 // buffer >> isCompressible_; 1137 // buffer >> orderPosInGrid; 1138 // buffer >> globalDimGrid; 1139 1140 // global_zoom_begin = global_zoom_begin_tmp; 1141 // global_zoom_n = global_zoom_n_tmp; 1142 // int global_zoom_end = global_zoom_begin + global_zoom_n - 1; 1143 1144 // zoom_begin_srv = global_zoom_begin > begin_srv ? global_zoom_begin : begin_srv ; 1145 // zoom_end_srv = global_zoom_end < end_srv ? global_zoom_end : end_srv ; 1146 // zoom_size_srv = zoom_end_srv - zoom_begin_srv + 1; 1147 1148 // if (zoom_size_srv<=0) 1149 // { 1150 // zoom_begin_srv = 0; zoom_end_srv = 0; zoom_size_srv = 0; 1151 // } 1152 1153 // if (n_glo == n) 1154 // { 1155 // zoom_begin_srv = global_zoom_begin; 1156 // zoom_end_srv = global_zoom_end; //zoom_end; 1157 // zoom_size_srv = zoom_end_srv - zoom_begin_srv + 1; 1158 // } 1159 // if (hasValue) 1160 // { 1161 // value_srv.resize(zoom_size_srv); 1162 // if (hasBounds_) bound_srv.resize(2,zoom_size_srv); 1163 // } 1164 // } 989 1165 990 1166 CTransformation<CAxis>* CAxis::addTransformation(ETranformationType transType, const StdString& id) … … 1007 1183 { 1008 1184 return transformationMap_; 1009 }1010 1011 /*!1012 Check the validity of all transformations applied on axis1013 This functions is called AFTER all inherited attributes are solved1014 */1015 void CAxis::checkTransformations()1016 {1017 TransMapTypes::const_iterator itb = transformationMap_.begin(), it,1018 ite = transformationMap_.end();1019 // for (it = itb; it != ite; ++it)1020 // {1021 // (it->second)->checkValid(this);1022 // }1023 1185 } 1024 1186
Note: See TracChangeset
for help on using the changeset viewer.