- Timestamp:
- 11/05/20 16:39:16 (4 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_COUPLING/src/node
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/axis.cpp
r1956 r1975 20 20 CAxis::CAxis(void) 21 21 : CObjectTemplate<CAxis>() 22 , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false) 23 , isClientAfterTransformationChecked(false) 22 , CAxisAttributes(), isChecked(false), relFiles() 24 23 , hasBounds(false), isCompressible_(false) 25 , numberWrittenIndexes_(), totalNumberWrittenIndexes_(), offsetWrittenIndexes_()26 24 , transformationMap_(), hasValue(false), hasLabel(false) 27 , computedWrittenIndex_(false) 28 , clients() 25 , clients() 29 26 { 30 27 } … … 32 29 CAxis::CAxis(const StdString & id) 33 30 : CObjectTemplate<CAxis>(id) 34 , CAxisAttributes(), isChecked(false), relFiles(), areClientAttributesChecked_(false) 35 , isClientAfterTransformationChecked(false) 31 , CAxisAttributes(), isChecked(false), relFiles() 36 32 , hasBounds(false), isCompressible_(false) 37 , numberWrittenIndexes_(), totalNumberWrittenIndexes_(), offsetWrittenIndexes_()38 33 , transformationMap_(), hasValue(false), hasLabel(false) 39 , computedWrittenIndex_(false) 40 , clients() 34 , clients() 41 35 { 42 36 } … … 131 125 CATCH_DUMP_ATTR 132 126 133 //---------------------------------------------------------------- 134 135 /*! 136 Returns the number of indexes written by each server. 137 \return the number of indexes written by each server 138 */ 139 int CAxis::getNumberWrittenIndexes(MPI_Comm writtenCom) 140 TRY 141 { 142 int writtenSize; 143 MPI_Comm_size(writtenCom, &writtenSize); 144 return numberWrittenIndexes_[writtenSize]; 145 } 146 CATCH_DUMP_ATTR 147 148 /*! 149 Returns the total number of indexes written by the servers. 150 \return the total number of indexes written by the servers 151 */ 152 int CAxis::getTotalNumberWrittenIndexes(MPI_Comm writtenCom) 153 TRY 154 { 155 int writtenSize; 156 MPI_Comm_size(writtenCom, &writtenSize); 157 return totalNumberWrittenIndexes_[writtenSize]; 158 } 159 CATCH_DUMP_ATTR 160 161 /*! 162 Returns the offset of indexes written by each server. 163 \return the offset of indexes written by each server 164 */ 165 int CAxis::getOffsetWrittenIndexes(MPI_Comm writtenCom) 166 TRY 167 { 168 int writtenSize; 169 MPI_Comm_size(writtenCom, &writtenSize); 170 return offsetWrittenIndexes_[writtenSize]; 171 } 172 CATCH_DUMP_ATTR 173 174 CArray<int, 1>& CAxis::getCompressedIndexToWriteOnServer(MPI_Comm writtenCom) 175 TRY 176 { 177 int writtenSize; 178 MPI_Comm_size(writtenCom, &writtenSize); 179 return compressedIndexToWriteOnServer[writtenSize]; 180 } 181 CATCH_DUMP_ATTR 182 183 //---------------------------------------------------------------- 127 //---------------------------------------------------------------- 184 128 185 129 /*! … … 481 425 switch(event.type) 482 426 { 483 case EVENT_ID_DISTRIBUTION_ATTRIBUTE :484 recvDistributionAttribute(event);485 return true;486 break;487 case EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES:488 recvNonDistributedAttributes(event);489 return true;490 break;491 case EVENT_ID_DISTRIBUTED_ATTRIBUTES:492 recvDistributedAttributes_old(event);493 return true;494 break;495 427 case EVENT_ID_AXIS_DISTRIBUTION: 496 428 recvAxisDistribution(event); … … 510 442 CATCH 511 443 512 /*! 513 Check attributes on client side (This name is still adequate???) 514 */ 515 void CAxis::checkAttributesOnClient() 516 TRY 517 { 518 if (this->areClientAttributesChecked_) return; 519 520 CContext* context=CContext::getCurrent(); 521 if (context->getServiceType()==CServicesManager::CLIENT) this->checkAttributes(); 522 523 this->areClientAttributesChecked_ = true; 524 } 525 CATCH_DUMP_ATTR 526 527 /* 528 The (spatial) transformation sometimes can change attributes of an axis (e.g zoom can change mask or generate can change whole attributes) 529 Therefore, we should recheck them. 530 */ 531 // ym : obsolete to be removed 532 void CAxis::checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid, 533 CServerDistributionDescription::ServerDistributionType distType) 534 TRY 535 { 536 CContext* context=CContext::getCurrent() ; 537 538 if (this->isClientAfterTransformationChecked) return; 539 if (context->getServiceType()==CServicesManager::CLIENT || context->getServiceType()==CServicesManager::GATHERER) 540 { 541 /* suppressed because of interface changed 542 if (orderPositionInGrid == CServerDistributionDescription::defaultDistributedDimension(globalDim.size(), distType)) 543 computeConnectedClients(globalDim, orderPositionInGrid, distType); 544 else if (index.numElements() != n_glo) computeConnectedClients(globalDim, orderPositionInGrid, CServerDistributionDescription::ROOT_DISTRIBUTION); 545 */ 546 } 547 548 this->isClientAfterTransformationChecked = true; 549 } 550 CATCH_DUMP_ATTR 551 552 /* 553 Send all checked attributes to server? (We dont have notion of server any more so client==server) 554 \param [in] globalDim global dimension of grid containing this axis 555 \param [in] orderPositionInGrid the relative order of this axis in the grid (e.g grid composed of domain+axis -> orderPositionInGrid is 2) 556 \param [in] distType distribution type of the server. For now, we only have band distribution. 557 558 */ 559 //ym obsolete : to be removed 560 void CAxis::sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid, 561 CServerDistributionDescription::ServerDistributionType distType) 562 TRY 563 { 564 if (!this->areClientAttributesChecked_) checkAttributesOnClient(); 565 if (!this->isClientAfterTransformationChecked) checkAttributesOnClientAfterTransformation(globalDim, orderPositionInGrid, distType); 566 CContext* context = CContext::getCurrent(); 567 568 if (this->isChecked) return; 569 if (context->getServiceType()==CServicesManager::CLIENT || context->getServiceType()==CServicesManager::GATHERER) /*sendAttributes(globalDim, orderPositionInGrid, distType)*/; 570 571 this->isChecked = true; 572 } 573 CATCH_DUMP_ATTR 574 575 576 void CAxis::sendAxisToFileServer(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid) 577 { 578 if (sendAxisToFileServer_done_.count(client)!=0) return ; 579 else sendAxisToFileServer_done_.insert(client) ; 580 581 StdString axisDefRoot("axis_definition"); 582 CAxisGroup* axisPtr = CAxisGroup::get(axisDefRoot); 583 axisPtr->sendCreateChild(this->getId(),client); 584 this->sendAllAttributesToServer(client) ; 585 this->sendAttributes(client, globalDim, orderPositionInGrid, CServerDistributionDescription::BAND_DISTRIBUTION) ; 586 } 587 444 /* to remove later when reimplementing coupling */ 588 445 void CAxis::sendAxisToCouplerOut(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid, const string& fieldId, int posInGrid) 589 446 { 590 if (sendAxisTo FileServer_done_.count(client)!=0) return ;591 else sendAxisTo FileServer_done_.insert(client) ;447 if (sendAxisToCouplerOut_done_.count(client)!=0) return ; 448 else sendAxisToCouplerOut_done_.insert(client) ; 592 449 593 450 string axisId="_axis["+std::to_string(posInGrid)+"]_of_"+fieldId ; 594 451 595 if (!axis_ref.isEmpty()) 596 { 597 auto axis_ref_tmp=axis_ref.getValue() ; 598 axis_ref.reset() ; // remove the reference, find an other way to do that more cleanly 599 this->sendAllAttributesToServer(client, axisId) ; 600 axis_ref = axis_ref_tmp ; 601 } 602 else this->sendAllAttributesToServer(client, axisId) ; 603 604 this->sendAttributes(client, globalDim, orderPositionInGrid, CServerDistributionDescription::BAND_DISTRIBUTION, axisId) ; 605 } 452 } 606 453 607 454 void CAxis::makeAliasForCoupling(const string& fieldId, int posInGrid) … … 611 458 } 612 459 613 /*! 614 Send attributes from one client to other clients 615 \param[in] globalDim global dimension of grid which contains this axis 616 \param[in] order 617 */ 618 void CAxis::sendAttributes(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid, 619 CServerDistributionDescription::ServerDistributionType distType, const string& axisId) 620 TRY 621 { 622 sendDistributionAttribute(client, globalDim, orderPositionInGrid, distType, axisId); 623 624 // if (index.numElements() == n_glo.getValue()) 625 if ((orderPositionInGrid == CServerDistributionDescription::defaultDistributedDimension(globalDim.size(), distType)) 626 || (index.numElements() != n_glo)) 627 { 628 sendDistributedAttributes_old(client, axisId); 629 } 630 else 631 { 632 sendNonDistributedAttributes(client, axisId); 633 } 634 } 635 CATCH_DUMP_ATTR 636 637 /* 638 Compute the connection between group of clients (or clients/servers). 639 (E.g: Suppose we have 2 group of clients in two model: A (client role) connect to B (server role), 640 this function calculate number of clients B connect to one client of A) 641 \param [in] globalDim global dimension of grid containing this axis 642 \param [in] orderPositionInGrid the relative order of this axis in the grid (e.g grid composed of domain+axis -> orderPositionInGrid is 2) 643 \param [in] distType distribution type of the server. For now, we only have band distribution. 644 */ 645 void CAxis::computeConnectedClients(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid) 646 TRY 647 { 648 if (computeConnectedClients_done_.count(client)!=0) return ; 649 else computeConnectedClients_done_.insert(client) ; 650 651 CContext* context = CContext::getCurrent(); 652 CServerDistributionDescription::ServerDistributionType distType ; 653 int defaultDistributedPos = CServerDistributionDescription::defaultDistributedDimension(globalDim.size(), CServerDistributionDescription::BAND_DISTRIBUTION) ; 654 655 if (orderPositionInGrid == defaultDistributedPos) distType = CServerDistributionDescription::BAND_DISTRIBUTION ; 656 else if (index.numElements() != n_glo) distType = CServerDistributionDescription::ROOT_DISTRIBUTION ; 657 else return ; 658 659 int nbServer = client->serverSize; 660 int range, clientSize = client->clientSize; 661 int rank = client->clientRank; 662 663 if (listNbServer_.count(nbServer) == 0) 664 { 665 listNbServer_.insert(nbServer) ; 666 667 if (connectedServerRank_.find(nbServer) != connectedServerRank_.end()) 668 { 669 nbSenders.erase(nbServer); 670 connectedServerRank_.erase(nbServer); 671 } 672 673 size_t ni = this->n.getValue(); 674 size_t ibegin = this->begin.getValue(); 675 size_t nbIndex = index.numElements(); 676 677 // First of all, we should compute the mapping of the global index and local index of the current client 678 if (globalLocalIndexMap_.empty()) 679 { 680 for (size_t idx = 0; idx < nbIndex; ++idx) 681 { 682 globalLocalIndexMap_[index(idx)] = idx; 683 } 684 } 685 686 // Calculate the compressed index if any 687 // std::set<int> writtenInd; 688 // if (isCompressible_) 689 // { 690 // for (int idx = 0; idx < data_index.numElements(); ++idx) 691 // { 692 // int ind = CDistributionClient::getAxisIndex(data_index(idx), data_begin, ni); 693 // 694 // if (ind >= 0 && ind < ni && mask(ind)) 695 // { 696 // ind += ibegin; 697 // writtenInd.insert(ind); 698 // } 699 // } 700 // } 701 702 // Compute the global index of the current client (process) hold 703 std::vector<int> nGlobAxis(1); 704 nGlobAxis[0] = n_glo.getValue(); 705 706 size_t globalSizeIndex = 1, indexBegin, indexEnd; 707 for (int i = 0; i < nGlobAxis.size(); ++i) globalSizeIndex *= nGlobAxis[i]; 708 indexBegin = 0; 709 if (globalSizeIndex <= clientSize) 710 { 711 indexBegin = rank%globalSizeIndex; 712 indexEnd = indexBegin; 713 } 714 else 715 { 716 for (int i = 0; i < clientSize; ++i) 717 { 718 range = globalSizeIndex / clientSize; 719 if (i < (globalSizeIndex%clientSize)) ++range; 720 if (i == client->clientRank) break; 721 indexBegin += range; 722 } 723 indexEnd = indexBegin + range - 1; 724 } 725 726 CArray<size_t,1> globalIndex(index.numElements()); 727 for (size_t idx = 0; idx < globalIndex.numElements(); ++idx) 728 globalIndex(idx) = index(idx); 729 730 // Describe the distribution of server side 731 732 CServerDistributionDescription serverDescription(nGlobAxis, nbServer, distType); 733 734 std::vector<int> serverZeroIndex; 735 serverZeroIndex = serverDescription.computeServerGlobalIndexInRange(std::make_pair<size_t&,size_t&>(indexBegin, indexEnd), 0); 736 737 std::list<int> serverZeroIndexLeader; 738 std::list<int> serverZeroIndexNotLeader; 739 CContextClient::computeLeader(client->clientRank, client->clientSize, serverZeroIndex.size(), serverZeroIndexLeader, serverZeroIndexNotLeader); 740 for (std::list<int>::iterator it = serverZeroIndexLeader.begin(); it != serverZeroIndexLeader.end(); ++it) 741 *it = serverZeroIndex[*it]; 742 743 // Find out the connection between client and server side 744 CClientServerMapping* clientServerMap = new CClientServerMappingDistributed(serverDescription.getGlobalIndexRange(), client->intraComm); 745 clientServerMap->computeServerIndexMapping(globalIndex, nbServer); 746 CClientServerMapping::GlobalIndexMap& globalIndexAxisOnServer = clientServerMap->getGlobalIndexOnServer(); 747 748 indSrv_[nbServer].swap(globalIndexAxisOnServer); 749 750 if (distType==CServerDistributionDescription::ROOT_DISTRIBUTION) 751 { 752 for(int i=1; i<nbServer; ++i) indSrv_[nbServer].insert(pair<int, vector<size_t> >(i,indSrv_[nbServer][0]) ) ; 753 serverZeroIndexLeader.clear() ; 754 } 755 756 CClientServerMapping::GlobalIndexMap::const_iterator it = indSrv_[nbServer].begin(), 757 ite = indSrv_[nbServer].end(); 758 759 for (it = indSrv_[nbServer].begin(); it != ite; ++it) connectedServerRank_[nbServer].push_back(it->first); 760 761 for (std::list<int>::const_iterator it = serverZeroIndexLeader.begin(); it != serverZeroIndexLeader.end(); ++it) 762 connectedServerRank_[nbServer].push_back(*it); 763 764 // Even if a client has no index, it must connect to at least one server and 765 // send an "empty" data to this server 766 if (connectedServerRank_[nbServer].empty()) 767 connectedServerRank_[nbServer].push_back(client->clientRank % client->serverSize); 768 769 nbSenders[nbServer] = CClientServerMapping::computeConnectedClients(client->serverSize, client->clientSize, client->intraComm, connectedServerRank_[nbServer]); 770 771 delete clientServerMap; 772 } 773 } 774 CATCH_DUMP_ATTR 775 776 /* 777 Compute the index of data to write into file 778 (Different from the previous version, this version of XIOS allows data be written into file (classical role), 779 or transfered to another clients) 780 */ 781 void CAxis::computeWrittenIndex() 782 TRY 783 { 784 if (computedWrittenIndex_) return; 785 computedWrittenIndex_ = true; 786 787 CContext* context=CContext::getCurrent(); 788 789 // We describe the distribution of client (server) on which data are written 790 std::vector<int> nBegin(1), nSize(1), nBeginGlobal(1), nGlob(1); 791 nBegin[0] = begin; 792 nSize[0] = n; 793 nBeginGlobal[0] = 0; 794 nGlob[0] = n_glo; 795 CDistributionServer srvDist(context->intraCommSize_, nBegin, nSize, nBeginGlobal, nGlob); 796 const CArray<size_t,1>& writtenGlobalIndex = srvDist.getGlobalIndex(); 797 798 // Because all written data are local on a client, 799 // we need to compute the local index on the server from its corresponding global index 800 size_t nbWritten = 0, indGlo; 801 std::unordered_map<size_t,size_t>::const_iterator itb = globalLocalIndexMap_.begin(), 802 ite = globalLocalIndexMap_.end(), it; 803 CArray<size_t,1>::const_iterator itSrvb = writtenGlobalIndex.begin(), 804 itSrve = writtenGlobalIndex.end(), itSrv; 805 806 localIndexToWriteOnServer.resize(writtenGlobalIndex.numElements()); 807 nbWritten = 0; 808 for (itSrv = itSrvb; itSrv != itSrve; ++itSrv) 809 { 810 indGlo = *itSrv; 811 if (ite != globalLocalIndexMap_.find(indGlo)) 812 { 813 localIndexToWriteOnServer(nbWritten) = globalLocalIndexMap_[indGlo]; 814 } 815 else 816 { 817 localIndexToWriteOnServer(nbWritten) = -1; 818 } 819 ++nbWritten; 820 } 821 822 } 823 CATCH_DUMP_ATTR 824 825 void CAxis::computeWrittenCompressedIndex(MPI_Comm writtenComm) 826 TRY 827 { 828 int writtenCommSize; 829 MPI_Comm_size(writtenComm, &writtenCommSize); 830 if (compressedIndexToWriteOnServer.find(writtenCommSize) != compressedIndexToWriteOnServer.end()) 831 return; 832 833 if (isCompressible()) 834 { 835 size_t nbWritten = 0, indGlo; 836 CContext* context=CContext::getCurrent(); 837 838 // We describe the distribution of client (server) on which data are written 839 std::vector<int> nBegin(1), nSize(1), nBeginGlobal(1), nGlob(1); 840 nBegin[0] = 0; 841 nSize[0] = n; 842 nBeginGlobal[0] = 0; 843 nGlob[0] = n_glo; 844 CDistributionServer srvDist(context->intraCommSize_, nBegin, nSize, nBeginGlobal, nGlob); 845 const CArray<size_t,1>& writtenGlobalIndex = srvDist.getGlobalIndex(); 846 std::unordered_map<size_t,size_t>::const_iterator itb = globalLocalIndexMap_.begin(), 847 ite = globalLocalIndexMap_.end(), it; 848 849 CArray<size_t,1>::const_iterator itSrvb = writtenGlobalIndex.begin(), 850 itSrve = writtenGlobalIndex.end(), itSrv; 851 std::unordered_map<size_t,size_t> localGlobalIndexMap; 852 for (itSrv = itSrvb; itSrv != itSrve; ++itSrv) 853 { 854 indGlo = *itSrv; 855 if (ite != globalLocalIndexMap_.find(indGlo)) 856 { 857 localGlobalIndexMap[localIndexToWriteOnServer(nbWritten)] = indGlo; 858 ++nbWritten; 859 } 860 } 861 // 862 // nbWritten = 0; 863 // for (int idx = 0; idx < data_index.numElements(); ++idx) 864 // { 865 // if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx))) 866 // { 867 // ++nbWritten; 868 // } 869 // } 870 // 871 // compressedIndexToWriteOnServer[writtenCommSize].resize(nbWritten); 872 // nbWritten = 0; 873 // for (int idx = 0; idx < data_index.numElements(); ++idx) 874 // { 875 // if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx))) 876 // { 877 // compressedIndexToWriteOnServer[writtenCommSize](nbWritten) = localGlobalIndexMap[data_index(idx)]; 878 // ++nbWritten; 879 // } 880 // } 881 882 nbWritten = 0; 883 for (int idx = 0; idx < data_index.numElements(); ++idx) 884 { 885 if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx))) 886 { 887 ++nbWritten; 888 } 889 } 890 891 compressedIndexToWriteOnServer[writtenCommSize].resize(nbWritten); 892 nbWritten = 0; 893 for (int idx = 0; idx < data_index.numElements(); ++idx) 894 { 895 if (localGlobalIndexMap.end() != localGlobalIndexMap.find(data_index(idx))) 896 { 897 compressedIndexToWriteOnServer[writtenCommSize](nbWritten) = localGlobalIndexMap[data_index(idx)]; 898 ++nbWritten; 899 } 900 } 901 902 numberWrittenIndexes_[writtenCommSize] = nbWritten; 903 904 bool distributed_glo, distributed=isDistributed() ; 905 MPI_Allreduce(&distributed,&distributed_glo, 1, MPI_INT, MPI_LOR, writtenComm) ; 906 if (distributed_glo) 907 { 908 909 MPI_Allreduce(&numberWrittenIndexes_[writtenCommSize], &totalNumberWrittenIndexes_[writtenCommSize], 1, MPI_INT, MPI_SUM, writtenComm); 910 MPI_Scan(&numberWrittenIndexes_[writtenCommSize], &offsetWrittenIndexes_[writtenCommSize], 1, MPI_INT, MPI_SUM, writtenComm); 911 offsetWrittenIndexes_[writtenCommSize] -= numberWrittenIndexes_[writtenCommSize]; 912 } 913 else 914 totalNumberWrittenIndexes_[writtenCommSize] = numberWrittenIndexes_[writtenCommSize]; 915 } 916 } 917 CATCH_DUMP_ATTR 918 919 /*! 920 Send distribution information from a group of client (client role) to another group of client (server role) 921 The distribution of a group of client (server role) is imposed by the group of client (client role) 922 \param [in] globalDim global dimension of grid containing this axis 923 \param [in] orderPositionInGrid the relative order of this axis in the grid (e.g grid composed of domain+axis -> orderPositionInGrid is 2) 924 \param [in] distType distribution type of the server. For now, we only have band distribution. 925 */ 926 void CAxis::sendDistributionAttribute(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid, 927 CServerDistributionDescription::ServerDistributionType distType, const string& axisId) 928 TRY 929 { 930 string serverAxisId = axisId.empty() ? this->getId() : axisId ; 931 int nbServer = client->serverSize; 932 933 CServerDistributionDescription serverDescription(globalDim, nbServer); 934 serverDescription.computeServerDistribution(); 935 936 std::vector<std::vector<int> > serverIndexBegin = serverDescription.getServerIndexBegin(); 937 std::vector<std::vector<int> > serverDimensionSizes = serverDescription.getServerDimensionSizes(); 938 939 CEventClient event(getType(),EVENT_ID_DISTRIBUTION_ATTRIBUTE); 940 if (client->isServerLeader()) 941 { 942 std::list<CMessage> msgs; 943 944 const std::list<int>& ranks = client->getRanksServerLeader(); 945 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 946 { 947 // Use const int to ensure CMessage holds a copy of the value instead of just a reference 948 const int begin = serverIndexBegin[*itRank][orderPositionInGrid]; 949 const int ni = serverDimensionSizes[*itRank][orderPositionInGrid]; 950 951 msgs.push_back(CMessage()); 952 CMessage& msg = msgs.back(); 953 msg << serverAxisId; 954 msg << ni << begin; 955 msg << isCompressible_; 956 957 event.push(*itRank,1,msg); 958 } 959 client->sendEvent(event); 960 } 961 else client->sendEvent(event); 962 } 963 CATCH_DUMP_ATTR 964 965 /* 966 Receive distribution attribute from another client 967 \param [in] event event containing data of these attributes 968 */ 969 void CAxis::recvDistributionAttribute(CEventServer& event) 970 TRY 971 { 972 CBufferIn* buffer = event.subEvents.begin()->buffer; 973 string axisId; 974 *buffer >> axisId; 975 get(axisId)->recvDistributionAttribute(*buffer); 976 } 977 CATCH 978 979 /* 980 Receive distribution attribute from another client 981 \param [in] buffer buffer containing data of these attributes 982 */ 983 void CAxis::recvDistributionAttribute(CBufferIn& buffer) 984 TRY 985 { 986 int ni_srv, begin_srv; 987 buffer >> ni_srv >> begin_srv; 988 buffer >> isCompressible_; 989 990 // Set up new local size of axis on the receiving clients 991 n.setValue(ni_srv); 992 begin.setValue(begin_srv); 993 } 994 CATCH_DUMP_ATTR 995 996 /* 997 Send attributes of axis from a group of client to other group of clients/servers 998 on supposing that these attributes are not distributed among the sending group 999 In the future, if new attributes are added, they should also be processed in this function 1000 */ 1001 void CAxis::sendNonDistributedAttributes(CContextClient* client, const string& axisId) 1002 TRY 1003 { 1004 string serverAxisId = axisId.empty() ? this->getId() : axisId ; 1005 1006 CEventClient event(getType(), EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES); 1007 size_t nbIndex = index.numElements(); 1008 size_t nbDataIndex = 0; 1009 1010 for (int idx = 0; idx < data_index.numElements(); ++idx) 1011 { 1012 int ind = data_index(idx); 1013 if (ind >= 0 && ind < nbIndex) ++nbDataIndex; 1014 } 1015 1016 CArray<int,1> dataIndex(nbDataIndex); 1017 nbDataIndex = 0; 1018 for (int idx = 0; idx < data_index.numElements(); ++idx) 1019 { 1020 int ind = data_index(idx); 1021 if (ind >= 0 && ind < nbIndex) 1022 { 1023 dataIndex(nbDataIndex) = ind; 1024 ++nbDataIndex; 1025 } 1026 } 1027 1028 if (client->isServerLeader()) 1029 { 1030 std::list<CMessage> msgs; 1031 1032 const std::list<int>& ranks = client->getRanksServerLeader(); 1033 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 1034 { 1035 msgs.push_back(CMessage()); 1036 CMessage& msg = msgs.back(); 1037 msg << serverAxisId; 1038 msg << index.getValue() << dataIndex << mask.getValue(); 1039 msg << hasValue; 1040 if (hasValue) msg << value.getValue(); 1041 msg << hasBounds; 1042 if (hasBounds) msg << bounds.getValue(); 1043 msg << hasLabel; 1044 if (hasLabel) msg << label.getValue(); 1045 1046 event.push(*itRank, 1, msg); 1047 } 1048 client->sendEvent(event); 1049 } 1050 else client->sendEvent(event); 1051 } 1052 CATCH_DUMP_ATTR 1053 1054 /* 1055 Receive the non-distributed attributes from another group of clients 1056 \param [in] event event containing data of these attributes 1057 */ 1058 void CAxis::recvNonDistributedAttributes(CEventServer& event) 1059 TRY 1060 { 1061 list<CEventServer::SSubEvent>::iterator it; 1062 for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 1063 { 1064 CBufferIn* buffer = it->buffer; 1065 string axisId; 1066 *buffer >> axisId; 1067 get(axisId)->recvNonDistributedAttributes(it->rank, *buffer); 1068 } 1069 } 1070 CATCH 1071 1072 /* 1073 Receive the non-distributed attributes from another group of clients 1074 \param [in] rank rank of the sender 1075 \param [in] buffer buffer containing data sent from the sender 1076 */ 1077 void CAxis::recvNonDistributedAttributes(int rank, CBufferIn& buffer) 1078 TRY 1079 { 1080 CArray<int,1> tmp_index, tmp_data_index; 1081 CArray<bool,1> tmp_mask; 1082 CArray<double,1> tmp_val; 1083 CArray<double,2> tmp_bnds; 1084 CArray<string,1> tmp_label; 1085 1086 buffer >> tmp_index; 1087 index.reference(tmp_index); 1088 buffer >> tmp_data_index; 1089 data_index.reference(tmp_data_index); 1090 buffer >> tmp_mask; 1091 mask.reference(tmp_mask); 1092 1093 buffer >> hasValue; 1094 if (hasValue) 1095 { 1096 buffer >> tmp_val; 1097 value.reference(tmp_val); 1098 } 1099 1100 buffer >> hasBounds; 1101 if (hasBounds) 1102 { 1103 buffer >> tmp_bnds; 1104 bounds.reference(tmp_bnds); 1105 } 1106 1107 buffer >> hasLabel; 1108 if (hasLabel) 1109 { 1110 buffer >> tmp_label; 1111 label.reference(tmp_label); 1112 } 1113 1114 // Some value should be reset here 1115 data_begin.setValue(0); 1116 data_n.setValue(data_index.numElements()); 1117 globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor())); 1118 // for (int idx = 0; idx < index.numElements(); ++idx) globalLocalIndexMap_[idx] = index(idx); 1119 for (int idx = 0; idx < index.numElements(); ++idx) globalLocalIndexMap_[index(idx)] = idx; 1120 } 1121 CATCH_DUMP_ATTR 1122 1123 /* 1124 Send axis attributes from a group of clients to another group of clients/servers 1125 supposing that these attributes are distributed among the clients of the sending group 1126 In future, if new attributes are added, they should also be processed in this function 1127 */ 1128 void CAxis::sendDistributedAttributes_old(CContextClient* client, const string& axisId) 1129 TRY 1130 { 1131 string serverAxisId = axisId.empty() ? this->getId() : axisId ; 1132 1133 int ind, idx; 1134 int nbServer = client->serverSize; 1135 1136 CEventClient eventData(getType(), EVENT_ID_DISTRIBUTED_ATTRIBUTES); 1137 1138 list<CMessage> listData; 1139 list<CArray<int,1> > list_indi, list_dataInd; 1140 list<CArray<double,1> > list_val; 1141 list<CArray<double,2> > list_bounds; 1142 list<CArray<string,1> > list_label; 1143 1144 // Cut off the ghost points 1145 int nbIndex = index.numElements(); 1146 CArray<int,1> dataIndex(nbIndex); 1147 dataIndex = -1; 1148 for (idx = 0; idx < data_index.numElements(); ++idx) 1149 { 1150 if (0 <= data_index(idx) && data_index(idx) < nbIndex) 1151 dataIndex(data_index(idx)) = 1; 1152 } 1153 1154 std::unordered_map<int, std::vector<size_t> >::const_iterator it, iteMap; 1155 iteMap = indSrv_[nbServer].end(); 1156 for (int k = 0; k < connectedServerRank_[nbServer].size(); ++k) 1157 { 1158 int nbData = 0, nbDataCount = 0; 1159 int rank = connectedServerRank_[nbServer][k]; 1160 it = indSrv_[nbServer].find(rank); 1161 if (iteMap != it) 1162 nbData = it->second.size(); 1163 1164 list_indi.push_back(CArray<int,1>(nbData)); 1165 list_dataInd.push_back(CArray<int,1>(nbData)); 1166 1167 if (hasValue) 1168 list_val.push_back(CArray<double,1>(nbData)); 1169 1170 if (hasBounds) 1171 list_bounds.push_back(CArray<double,2>(2,nbData)); 1172 1173 if (hasLabel) 1174 list_label.push_back(CArray<string,1>(nbData)); 1175 1176 CArray<int,1>& indi = list_indi.back(); 1177 CArray<int,1>& dataIndi = list_dataInd.back(); 1178 dataIndi = -1; 1179 1180 for (int n = 0; n < nbData; ++n) 1181 { 1182 idx = static_cast<int>(it->second[n]); 1183 indi(n) = idx; 1184 1185 ind = globalLocalIndexMap_[idx]; 1186 dataIndi(n) = dataIndex(ind); 1187 1188 if (hasValue) 1189 { 1190 CArray<double,1>& val = list_val.back(); 1191 val(n) = value(ind); 1192 } 1193 1194 if (hasBounds) 1195 { 1196 CArray<double,2>& boundsVal = list_bounds.back(); 1197 boundsVal(0, n) = bounds(0,ind); 1198 boundsVal(1, n) = bounds(1,ind); 1199 } 1200 1201 if (hasLabel) 1202 { 1203 CArray<string,1>& labelVal = list_label.back(); 1204 labelVal(n) = label(ind); 1205 } 1206 } 1207 1208 listData.push_back(CMessage()); 1209 listData.back() << serverAxisId 1210 << list_indi.back() << list_dataInd.back(); 1211 1212 listData.back() << hasValue; 1213 if (hasValue) 1214 listData.back() << list_val.back(); 1215 1216 listData.back() << hasBounds; 1217 if (hasBounds) 1218 listData.back() << list_bounds.back(); 1219 1220 listData.back() << hasLabel; 1221 if (hasLabel) 1222 listData.back() << list_label.back(); 1223 1224 eventData.push(rank, nbSenders[nbServer][rank], listData.back()); 1225 } 1226 1227 client->sendEvent(eventData); 1228 } 1229 CATCH_DUMP_ATTR 1230 1231 /* 1232 Receive the distributed attributes from another group of clients 1233 \param [in] event event containing data of these attributes 1234 */ 1235 void CAxis::recvDistributedAttributes_old(CEventServer& event) 1236 TRY 1237 { 1238 string axisId; 1239 vector<int> ranks; 1240 vector<CBufferIn*> buffers; 1241 1242 list<CEventServer::SSubEvent>::iterator it; 1243 for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 1244 { 1245 ranks.push_back(it->rank); 1246 CBufferIn* buffer = it->buffer; 1247 *buffer >> axisId; 1248 buffers.push_back(buffer); 1249 } 1250 get(axisId)->recvDistributedAttributes_old(ranks, buffers); 1251 } 1252 CATCH 1253 1254 /* 1255 Receive the non-distributed attributes from another group of clients 1256 \param [in] ranks rank of the sender 1257 \param [in] buffers buffer containing data sent from the sender 1258 */ 1259 void CAxis::recvDistributedAttributes_old(vector<int>& ranks, vector<CBufferIn*> buffers) 1260 TRY 1261 { 1262 int nbReceived = ranks.size(), idx, ind, gloInd, locInd; 1263 vector<CArray<int,1> > vec_indi(nbReceived), vec_dataInd(nbReceived); 1264 vector<CArray<double,1> > vec_val(nbReceived); 1265 vector<CArray<double,2> > vec_bounds(nbReceived); 1266 vector<CArray<string,1> > vec_label(nbReceived); 1267 1268 for (idx = 0; idx < nbReceived; ++idx) 1269 { 1270 CBufferIn& buffer = *buffers[idx]; 1271 buffer >> vec_indi[idx]; 1272 buffer >> vec_dataInd[idx]; 1273 1274 buffer >> hasValue; 1275 if (hasValue) 1276 buffer >> vec_val[idx]; 1277 1278 buffer >> hasBounds; 1279 if (hasBounds) 1280 buffer >> vec_bounds[idx]; 1281 1282 buffer >> hasLabel; 1283 if (hasLabel) 1284 buffer >> vec_label[idx]; 1285 } 1286 1287 // Estimate size of index array 1288 int nbIndexGlob = 0; 1289 for (idx = 0; idx < nbReceived; ++idx) 1290 { 1291 nbIndexGlob += vec_indi[idx].numElements(); 1292 } 1293 1294 // Recompute global index 1295 // Take account of the overlapped index 1296 index.resize(nbIndexGlob); 1297 globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor())); 1298 nbIndexGlob = 0; 1299 int nbIndLoc = 0; 1300 for (idx = 0; idx < nbReceived; ++idx) 1301 { 1302 CArray<int,1>& tmp = vec_indi[idx]; 1303 for (ind = 0; ind < tmp.numElements(); ++ind) 1304 { 1305 gloInd = tmp(ind); 1306 nbIndLoc = (gloInd % n_glo)-begin; 1307 if (0 == globalLocalIndexMap_.count(gloInd)) 1308 { 1309 index(nbIndexGlob) = gloInd % n_glo; 1310 globalLocalIndexMap_[gloInd] = nbIndexGlob; 1311 ++nbIndexGlob; 1312 } 1313 } 1314 } 1315 1316 // Resize index to its real size 1317 if (nbIndexGlob==0) index.resize(nbIndexGlob) ; 1318 else index.resizeAndPreserve(nbIndexGlob); 1319 1320 int nbData = nbIndexGlob; 1321 CArray<int,1> nonCompressedData(nbData); 1322 nonCompressedData = -1; 1323 // Mask is incorporated into data_index and is not sent/received anymore 1324 mask.reset(); 1325 if (hasValue) 1326 value.resize(nbData); 1327 if (hasBounds) 1328 bounds.resize(2,nbData); 1329 if (hasLabel) 1330 label.resize(nbData); 1331 1332 nbData = 0; 1333 for (idx = 0; idx < nbReceived; ++idx) 1334 { 1335 CArray<int,1>& indi = vec_indi[idx]; 1336 CArray<int,1>& dataIndi = vec_dataInd[idx]; 1337 int nb = indi.numElements(); 1338 for (int n = 0; n < nb; ++n) 1339 { 1340 locInd = globalLocalIndexMap_[size_t(indi(n))]; 1341 1342 nonCompressedData(locInd) = (-1 == nonCompressedData(locInd)) ? dataIndi(n) : nonCompressedData(locInd); 1343 1344 if (hasValue) 1345 value(locInd) = vec_val[idx](n); 1346 1347 if (hasBounds) 1348 { 1349 bounds(0,locInd) = vec_bounds[idx](0,n); 1350 bounds(1,locInd) = vec_bounds[idx](1,n); 1351 } 1352 1353 if (hasLabel) 1354 label(locInd) = vec_label[idx](n); 1355 } 1356 } 1357 1358 int nbCompressedData = 0; 1359 for (idx = 0; idx < nonCompressedData.numElements(); ++idx) 1360 { 1361 if (0 <= nonCompressedData(idx)) 1362 ++nbCompressedData; 1363 } 1364 1365 data_index.resize(nbCompressedData); 1366 nbCompressedData = 0; 1367 for (idx = 0; idx < nonCompressedData.numElements(); ++idx) 1368 { 1369 if (0 <= nonCompressedData(idx)) 1370 { 1371 data_index(nbCompressedData) = idx % n; 1372 ++nbCompressedData; 1373 } 1374 } 1375 1376 data_begin.setValue(0); 1377 data_n.setValue(data_index.numElements()); 1378 } 1379 CATCH_DUMP_ATTR 1380 460 1381 461 /*! 1382 462 Compare two axis objects. -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/axis.hpp
r1956 r1975 50 50 enum EEventId 51 51 { 52 EVENT_ID_DISTRIBUTION_ATTRIBUTE,53 EVENT_ID_DISTRIBUTED_VALUE,54 EVENT_ID_NON_DISTRIBUTED_VALUE,55 EVENT_ID_NON_DISTRIBUTED_ATTRIBUTES,56 EVENT_ID_DISTRIBUTED_ATTRIBUTES,57 52 EVENT_ID_AXIS_DISTRIBUTION, 58 53 EVENT_ID_SEND_DISTRIBUTED_ATTRIBUTE … … 75 70 /// Accesseurs /// 76 71 const std::set<StdString> & getRelFiles(void) const; 77 78 int getNumberWrittenIndexes(MPI_Comm writtenCom); 79 int getTotalNumberWrittenIndexes(MPI_Comm writtenCom); 80 int getOffsetWrittenIndexes(MPI_Comm writtenCom); 81 CArray<int, 1>& getCompressedIndexToWriteOnServer(MPI_Comm writtenCom); 82 72 83 73 std::map<int, StdSize> getAttributesBufferSize(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid, 84 74 CServerDistributionDescription::ServerDistributionType disType = CServerDistributionDescription::BAND_DISTRIBUTION); … … 126 116 void checkAttributes(void); 127 117 bool checkAttributes_done_ = false ; 128 129 void checkAttributesOnClient();130 void checkAttributesOnClientAfterTransformation(const std::vector<int>& globalDim, int orderPositionInGrid,131 CServerDistributionDescription::ServerDistributionType distType = CServerDistributionDescription::BAND_DISTRIBUTION);132 void sendCheckedAttributes(const std::vector<int>& globalDim, int orderPositionInGrid,133 CServerDistributionDescription::ServerDistributionType disType = CServerDistributionDescription::BAND_DISTRIBUTION);134 118 135 119 size_t getGlobalWrittenSize(void) ; 136 120 137 void computeWrittenIndex();138 void computeWrittenCompressedIndex(MPI_Comm);139 121 bool hasTransformation(); 140 122 void solveInheritanceTransformation(); … … 149 131 bool hasLabel; 150 132 151 CArray<int,1> localIndexToWriteOnServer; 152 153 void computeConnectedClients(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid); 154 private: std::set<CContextClient*> computeConnectedClients_done_ ; public : 155 /** The number of server of a context client. Avoid to re-compute indice computed in a previous computeConnectedClient */ 156 private: std::set<int> listNbServer_ ; public: 157 158 private: 133 private: 159 134 void checkData(); 160 135 void checkMask(); 161 136 void checkBounds(); 162 137 void checkLabel(); 163 public:164 void sendAxisToFileServer(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid) ;165 private:166 std::set<CContextClient*> sendAxisToFileServer_done_ ;167 138 168 139 public: … … 175 146 176 147 private: 177 void sendAttributes(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid,178 CServerDistributionDescription::ServerDistributionType distType, const string& axisId="");179 void sendDistributionAttribute(CContextClient* client, const std::vector<int>& globalDim, int orderPositionInGrid,180 CServerDistributionDescription::ServerDistributionType distType, const string& axisId="");181 182 183 void sendNonDistributedAttributes(CContextClient* client, const string& axisId="");184 void sendDistributedAttributes_old(CContextClient* client, const string& axisId="");185 186 static void recvNonDistributedAttributes(CEventServer& event);187 static void recvDistributedAttributes_old(CEventServer& event);188 static void recvDistributionAttribute(CEventServer& event);189 void recvNonDistributedAttributes(int rank, CBufferIn& buffer);190 void recvDistributedAttributes_old(vector<int>& rank, vector<CBufferIn*> buffers);191 void recvDistributionAttribute(CBufferIn& buffer);192 193 148 void setTransformations(const TransMapTypes&); 194 149 … … 221 176 private: 222 177 bool isChecked; 223 bool areClientAttributesChecked_;224 bool isClientAfterTransformationChecked;225 178 std::set<StdString> relFiles, relFilesCompressed; 226 179 TransMapTypes transformationMap_; 227 180 228 std::map<int, map<int,int> > nbSenders; // Mapping of number of communicating client to a server229 181 std::map<int, std::unordered_map<int, vector<size_t> > > indSrv_; // Global index of each client sent to server 230 // std::map<int, vector<int> > indWrittenSrv_; // Global written index of each client sent to server231 std::unordered_map<size_t,size_t> globalLocalIndexMap_;232 std::map<int,int> numberWrittenIndexes_, totalNumberWrittenIndexes_, offsetWrittenIndexes_;233 std::map<int, CArray<int, 1> > compressedIndexToWriteOnServer;234 182 std::map<int, std::vector<int> > connectedServerRank_; 235 bool computedWrittenIndex_;236 183 237 184 private: -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/scalar.cpp
r1958 r1975 70 70 addWorkflowView() ; 71 71 addModelView() ; 72 }73 74 void CScalar::checkAttributesOnClient()75 {76 77 72 } 78 73 … … 158 153 } 159 154 160 void CScalar::sendScalarToFileServer(CContextClient* client) 161 { 162 if (sendScalarToFileServer_done_.count(client)!=0) return ; 163 else sendScalarToFileServer_done_.insert(client) ; 164 StdString scalarDefRoot("scalar_definition"); 165 CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot); 166 this->sendAllAttributesToServer(client); 167 } 168 155 /* obsolete, to remove after reimplementing coupling */ 169 156 void CScalar::sendScalarToCouplerOut(CContextClient* client, const string& fieldId, int posInGrid) 170 157 { … … 173 160 174 161 string scalarId="_scalar["+std::to_string(posInGrid)+"]_of_"+fieldId ; 175 176 if (!scalar_ref.isEmpty())177 {178 auto scalar_ref_tmp=scalar_ref.getValue() ;179 scalar_ref.reset() ; // remove the reference, find an other way to do that more cleanly180 this->sendAllAttributesToServer(client, scalarId) ;181 scalar_ref = scalar_ref_tmp ;182 }183 else this->sendAllAttributesToServer(client, scalarId) ;184 185 162 186 163 this->sendAllAttributesToServer(client, scalarId); -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/scalar.hpp
r1958 r1975 86 86 void addRelFile(const StdString& filename); 87 87 bool IsWritten(const StdString& filename) const; 88 void checkAttributesOnClient(); 89 virtual void parse(xml::CXMLNode & node); 88 virtual void parse(xml::CXMLNode & node); 90 89 91 public:92 void sendScalarToFileServer(CContextClient* client) ;93 private:94 std::set<CContextClient*> sendScalarToFileServer_done_ ;95 96 90 public: 97 91 void sendScalarToCouplerOut(CContextClient* client, const string& fieldId, int posInGrid) ;
Note: See TracChangeset
for help on using the changeset viewer.