Changeset 833 for XIOS/trunk/src/client_client_dht_template_impl.hpp
- Timestamp:
- 04/08/16 15:00:15 (8 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/trunk/src/client_client_dht_template_impl.hpp
r832 r833 25 25 const MPI_Comm& clientIntraComm, 26 26 int hierarLvl) 27 : index2InfoMapping_(), indexToInfoMappingLevel_() 28 { 29 this->computeMPICommLevel(clientIntraComm, hierarLvl); 30 int lvl = this->commLevel_.size() - 1; 31 computeDistributedIndex(indexInfoMap, this->commLevel_[lvl], lvl); 27 : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_() 28 { 29 this->computeMPICommLevel(); 30 int nbLvl = this->getNbLevel(); 31 sendRank_.resize(nbLvl); 32 recvRank_.resize(nbLvl); 33 computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1); 32 34 } 33 35 … … 44 46 void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices) 45 47 { 46 int lvl = this->commLevel_.size() - 1;47 computeIndexInfoMappingLevel(indices, this-> commLevel_[lvl], lvl);48 int nbLvl = this->getNbLevel(); 49 computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1); 48 50 } 49 51 … … 60 62 int level) 61 63 { 62 int nbClient, clientRank; 63 MPI_Comm_size(commLevel,&nbClient); 64 int clientRank; 64 65 MPI_Comm_rank(commLevel,&clientRank); 66 int groupRankBegin = this->getGroupBegin()[level]; 67 int nbClient = this->getNbInGroup()[level]; 65 68 std::vector<size_t> hashedIndex; 66 69 computeHashIndex(hashedIndex, nbClient); … … 70 73 std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash, 71 74 iteClientHash = hashedIndex.end(); 72 std::map<int, std::vector<size_t> > client2ClientIndex; 75 std::vector<int> sendBuff(nbClient,0); 76 std::vector<int> sendNbIndexBuff(nbClient,0); 73 77 74 78 // Number of global index whose mapping server are on other clients 75 79 int nbIndexToSend = 0; 80 size_t index; 76 81 HashXIOS<size_t> hashGlobalIndex; 77 82 for (int i = 0; i < ssize; ++i) 78 83 { 79 size_tindex = indices(i);84 index = indices(i); 80 85 hashedVal = hashGlobalIndex(index); 81 86 itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal); 82 if (iteClientHash != itClientHash) 87 int indexClient = std::distance(itbClientHash, itClientHash)-1; 88 ++sendNbIndexBuff[indexClient]; 89 } 90 91 std::map<int, size_t* > client2ClientIndex; 92 for (int idx = 0; idx < nbClient; ++idx) 93 { 94 if (0 != sendNbIndexBuff[idx]) 95 { 96 client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]]; 97 nbIndexToSend += sendNbIndexBuff[idx]; 98 sendBuff[idx] = 1; 99 sendNbIndexBuff[idx] = 0; 100 } 101 } 102 103 for (int i = 0; i < ssize; ++i) 104 { 105 index = indices(i); 106 hashedVal = hashGlobalIndex(index); 107 itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal); 83 108 { 84 109 int indexClient = std::distance(itbClientHash, itClientHash)-1; 85 110 { 86 client2ClientIndex[indexClient ].push_back(index);87 ++ nbIndexToSend;111 client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;; 112 ++sendNbIndexBuff[indexClient]; 88 113 } 89 114 } 90 115 } 91 116 92 int* sendBuff = new int[nbClient]; 93 for (int i = 0; i < nbClient; ++i) sendBuff[i] = 0; 94 std::map<int, std::vector<size_t> >::iterator itb = client2ClientIndex.begin(), it, 95 ite = client2ClientIndex.end(); 96 for (it = itb; it != ite; ++it) sendBuff[it->first] = 1; 97 int* recvBuff = new int[nbClient]; 98 MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel); 117 int recvNbClient, recvNbIndexCount; 118 sendRecvRank(level, sendBuff, sendNbIndexBuff, 119 recvNbClient, recvNbIndexCount); 120 121 std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex, 122 iteIndex = client2ClientIndex.end(); 99 123 100 124 std::list<MPI_Request> sendIndexRequest; 101 if (0 != nbIndexToSend)102 for (it = itb; it != ite; ++it)103 sendIndexToClients(it->first, it->second, commLevel, sendIndexRequest); 104 105 int nb DemandingClient = recvBuff[clientRank], nbSendBuffInfoReceived = 0;125 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) 126 sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendIndexRequest); 127 128 int nbDemandingClient = recvNbClient; //recvBuff[clientRank], 129 int nbSendBuffInfoReceived = 0; 106 130 107 131 // Receiving demand as well as the responds from other clients … … 109 133 // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s) 110 134 // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer 111 for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size();112 MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);113 114 135 unsigned long* recvBuffIndex = 0; 115 int maxNbIndexDemandedFromOthers = recvBuff[clientRank]; 116 136 int maxNbIndexDemandedFromOthers = recvNbIndexCount; 117 137 if (0 != maxNbIndexDemandedFromOthers) 118 138 recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers]; 119 139 120 140 // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients 121 // InfoType* recvBuffInfo = 0;122 141 unsigned char* recvBuffInfo = 0; 123 142 int nbIndexReceivedFromOthers = nbIndexToSend; … … 181 200 { 182 201 --level; 183 computeIndexInfoMappingLevel(tmpGlobalIndexOnClient, this-> commLevel_[level], level);202 computeIndexInfoMappingLevel(tmpGlobalIndexOnClient, this->internalComm_, level); 184 203 } 185 204 else … … 234 253 int clientSourceRank = statusInfo.MPI_SOURCE; 235 254 unsigned char* beginBuff = infoBuffBegin[clientSourceRank]; 236 s td::vector<size_t>&indexTmp = client2ClientIndex[clientSourceRank];255 size_t* indexTmp = client2ClientIndex[clientSourceRank]; 237 256 int infoIndex = 0; 238 257 for (int i = 0; i < actualCountInfo; ++i) … … 253 272 if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex; 254 273 if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo; 274 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second; 255 275 for (int idx = 0; idx < infoToSend.size(); ++idx) delete [] infoToSend[idx]; 256 delete [] sendBuff;257 delete [] recvBuff;258 276 } 259 277 … … 292 310 int level) 293 311 { 294 int nbClient, clientRank; 295 MPI_Comm_size(commLevel,&nbClient); 312 int clientRank; 296 313 MPI_Comm_rank(commLevel,&clientRank); 314 computeSendRecvRank(level, clientRank); 315 316 int groupRankBegin = this->getGroupBegin()[level]; 317 int nbClient = this->getNbInGroup()[level]; 297 318 std::vector<size_t> hashedIndex; 298 319 computeHashIndex(hashedIndex, nbClient); 299 320 300 int* sendBuff = new int[nbClient]; 301 int* sendNbIndexBuff = new int[nbClient]; 302 for (int i = 0; i < nbClient; ++i) 303 { 304 sendBuff[i] = 0; sendNbIndexBuff[i] = 0; 305 } 306 307 // Compute size of sending and receving buffer 308 std::map<int, std::vector<size_t> > client2ClientIndex; 309 std::map<int, std::vector<InfoType> > client2ClientInfo; 310 321 std::vector<int> sendBuff(nbClient,0); 322 std::vector<int> sendNbIndexBuff(nbClient,0); 311 323 std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash, 312 324 iteClientHash = hashedIndex.end(); 313 typename boost::unordered_map<size_t,InfoType>::const_iterator it = indexInfoMap.begin(),325 typename boost::unordered_map<size_t,InfoType>::const_iterator itb = indexInfoMap.begin(),it, 314 326 ite = indexInfoMap.end(); 315 327 HashXIOS<size_t> hashGlobalIndex; 316 for (; it != ite; ++it) 328 329 // Compute size of sending and receving buffer 330 for (it = itb; it != ite; ++it) 317 331 { 318 332 size_t hashIndex = hashGlobalIndex(it->first); 319 333 itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex); 320 if (itClientHash != iteClientHash)321 334 { 322 335 int indexClient = std::distance(itbClientHash, itClientHash)-1; 323 336 { 324 sendBuff[indexClient] = 1;325 337 ++sendNbIndexBuff[indexClient]; 326 client2ClientIndex[indexClient].push_back(it->first);327 client2ClientInfo[indexClient].push_back(it->second);328 338 } 329 339 } 330 340 } 331 341 342 std::map<int, size_t*> client2ClientIndex; 343 std::map<int, unsigned char*> client2ClientInfo; 344 for (int idx = 0; idx < nbClient; ++idx) 345 { 346 if (0 != sendNbIndexBuff[idx]) 347 { 348 client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]]; 349 client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()]; 350 sendNbIndexBuff[idx] = 0; 351 sendBuff[idx] = 1; 352 } 353 } 354 355 std::vector<int> sendNbInfo(nbClient,0); 356 for (it = itb; it != ite; ++it) 357 { 358 size_t hashIndex = hashGlobalIndex(it->first); 359 itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex); 360 { 361 int indexClient = std::distance(itbClientHash, itClientHash)-1; 362 { 363 client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;; 364 ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]); 365 ++sendNbIndexBuff[indexClient]; 366 } 367 } 368 } 369 332 370 // Calculate from how many clients each client receive message. 333 int* recvBuff = new int[nbClient];334 MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);335 int recvNbClient = recvBuff[clientRank];336 337 371 // Calculate size of buffer for receiving message 338 int* recvNbIndexBuff = new int[nbClient]; 339 MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient, MPI_INT, MPI_SUM, commLevel); 340 int recvNbIndexCount = recvNbIndexBuff[clientRank]; 341 unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount]; 342 unsigned char* recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()]; 372 int recvNbClient, recvNbIndexCount; 373 sendRecvRank(level, sendBuff, sendNbIndexBuff, 374 recvNbClient, recvNbIndexCount); 343 375 344 376 // If a client holds information about index and the corresponding which don't belong to it, … … 346 378 // Contents of the message are index and its corresponding informatioin 347 379 std::list<MPI_Request> sendRequest; 348 std::map<int, std::vector<size_t> >::iterator itIndex = client2ClientIndex.begin(), 349 iteIndex = client2ClientIndex.end(); 350 for (; itIndex != iteIndex; ++itIndex) 351 sendIndexToClients(itIndex->first, itIndex->second, commLevel, sendRequest); 352 typename std::map<int, std::vector<InfoType> >::iterator itbInfo = client2ClientInfo.begin(), itInfo, 353 iteInfo = client2ClientInfo.end(); 354 355 std::vector<int> infoSizeToSend(client2ClientInfo.size(),0); 356 std::vector<unsigned char*> infoToSend(client2ClientInfo.size()); 357 itInfo = itbInfo; 358 for (int idx = 0; itInfo != iteInfo; ++itInfo, ++idx) 359 { 360 const std::vector<InfoType>& infoVec = itInfo->second; 361 int infoVecSize = infoVec.size(); 362 std::vector<int> infoIndex(infoVecSize); 363 for (int i = 0; i < infoVecSize; ++i) 364 { 365 infoIndex[i] = infoSizeToSend[idx]; 366 ProcessDHTElement<InfoType>::packElement(infoVec[i], NULL, infoSizeToSend[idx]); 367 } 368 369 infoToSend[idx] = new unsigned char[infoSizeToSend[idx]]; 370 infoSizeToSend[idx] = 0; 371 for (int i = 0; i < infoVecSize; ++i) 372 { 373 ProcessDHTElement<InfoType>::packElement(infoVec[i], infoToSend[idx], infoSizeToSend[idx]); 374 } 375 376 sendInfoToClients(itInfo->first, infoToSend[idx], infoSizeToSend[idx], commLevel, sendRequest); 377 } 378 380 std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex, 381 iteIndex = client2ClientIndex.end(); 382 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) 383 sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendRequest); 384 std::map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo, 385 iteInfo = client2ClientInfo.end(); 386 for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) 387 sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, sendRequest); 388 389 390 unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount]; 391 unsigned char* recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()]; 379 392 380 393 std::map<int, MPI_Request>::iterator itRequestIndex, itRequestInfo; … … 465 478 } 466 479 467 for (int idx = 0; idx < infoToSend.size(); ++idx) delete [] infoToSend[idx]; 468 delete [] sendBuff; 469 delete [] sendNbIndexBuff; 470 delete [] recvBuff; 471 delete [] recvNbIndexBuff; 480 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second; 481 for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) delete [] itInfo->second; 472 482 delete [] recvIndexBuff; 473 483 delete [] recvInfoBuff; … … 477 487 { 478 488 --level; 479 computeDistributedIndex(indexToInfoMapping, this-> commLevel_[level], level);489 computeDistributedIndex(indexToInfoMapping, this->internalComm_, level); 480 490 } 481 491 else … … 563 573 */ 564 574 template<typename T, typename H> 565 void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, s td::vector<size_t>& indices,575 void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize, 566 576 const MPI_Comm& clientIntraComm, 567 577 std::list<MPI_Request>& requestSendIndex) … … 569 579 MPI_Request request; 570 580 requestSendIndex.push_back(request); 571 MPI_Isend( &(indices)[0], (indices).size(), MPI_UNSIGNED_LONG,581 MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG, 572 582 clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back())); 573 583 } … … 658 668 } 659 669 660 } 670 /*! 671 Compute how many processes one process needs to send to and from how many processes it will receive 672 */ 673 template<typename T, typename H> 674 void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank) 675 { 676 int groupBegin = this->getGroupBegin()[level]; 677 int nbInGroup = this->getNbInGroup()[level]; 678 const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level]; 679 const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level]; 680 681 std::vector<size_t> hashedIndexGroup; 682 computeHashIndex(hashedIndexGroup, nbInGroup); 683 size_t a = hashedIndexGroup[rank-groupBegin]; 684 size_t b = hashedIndexGroup[rank-groupBegin+1]-1; 685 686 int currentGroup, offset; 687 size_t e,f; 688 689 // Do a simple math [a,b) intersect [c,d) 690 for (int idx = 0; idx < groupParentBegin.size(); ++idx) 691 { 692 std::vector<size_t> hashedIndexGroupParent; 693 int nbInGroupParent = nbInGroupParents[idx]; 694 if (0 != nbInGroupParent) 695 computeHashIndex(hashedIndexGroupParent, nbInGroupParent); 696 for (int i = 0; i < nbInGroupParent; ++i) 697 { 698 size_t c = hashedIndexGroupParent[i]; 699 size_t d = hashedIndexGroupParent[i+1]-1; 700 701 if (!((d < a) || (b <c))) 702 recvRank_[level].push_back(groupParentBegin[idx]+i); 703 } 704 705 offset = rank - groupParentBegin[idx]; 706 if ((offset<nbInGroupParents[idx]) && (0 <= offset)) 707 { 708 e = hashedIndexGroupParent[offset]; 709 f = hashedIndexGroupParent[offset+1]-1; 710 } 711 } 712 713 std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup, 714 iteHashGroup = hashedIndexGroup.end(); 715 itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1); 716 int begin = std::distance(itbHashGroup, itHashGroup)-1; 717 itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f); 718 int end = std::distance(itbHashGroup, itHashGroup) -1; 719 sendRank_[level].resize(end-begin+1); 720 for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin; 721 } 722 723 /*! 724 Send and receive number of process each process need to listen to as well as number 725 of index it will receive 726 */ 727 template<typename T, typename H> 728 void CClientClientDHTTemplate<T,H>::sendRecvRank(int level, 729 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements, 730 int& recvNbRank, int& recvNbElements) 731 { 732 int groupBegin = this->getGroupBegin()[level]; 733 734 int offSet = 0; 735 std::vector<int>& sendRank = sendRank_[level]; 736 std::vector<int>& recvRank = recvRank_[level]; 737 int sendBuffSize = sendRank.size(); 738 int* sendBuff = new int [sendBuffSize*2]; 739 std::vector<MPI_Request> request(sendBuffSize); 740 std::vector<MPI_Status> requestStatus(sendBuffSize); 741 int recvBuffSize = recvRank.size(); 742 int* recvBuff = new int [2]; 743 744 for (int idx = 0; idx < sendBuffSize; ++idx) 745 { 746 offSet = sendRank[idx]-groupBegin; 747 sendBuff[idx*2] = sendNbRank[offSet]; 748 sendBuff[idx*2+1] = sendNbElements[offSet]; 749 } 750 751 for (int idx = 0; idx < sendBuffSize; ++idx) 752 { 753 MPI_Isend(&sendBuff[idx*2], 2, MPI_INT, 754 sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[idx]); 755 } 756 757 MPI_Status status; 758 int nbRecvRank = 0, nbRecvElements = 0; 759 for (int idx = 0; idx < recvBuffSize; ++idx) 760 { 761 MPI_Recv(recvBuff, 2, MPI_INT, 762 recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &status); 763 nbRecvRank += *(recvBuff); 764 nbRecvElements += *(recvBuff+1); 765 } 766 767 MPI_Waitall(sendBuffSize, &request[0], &requestStatus[0]); 768 769 recvNbRank = nbRecvRank; 770 recvNbElements = nbRecvElements; 771 772 delete [] sendBuff; 773 delete [] recvBuff; 774 } 775 776 }
Note: See TracChangeset
for help on using the changeset viewer.