- Timestamp:
- 01/25/17 16:25:17 (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_yushan/src/client_client_dht_template_impl.hpp
r892 r1037 10 10 #include "utils.hpp" 11 11 #include "mpi_tag.hpp" 12 #ifdef _usingEP 13 #include "ep_declaration.hpp" 14 #endif 15 12 16 13 17 namespace xios … … 37 41 : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0) 38 42 { 43 39 44 MPI_Comm_size(clientIntraComm, &nbClient_); 40 45 this->computeMPICommLevel(); 41 int nbLvl = this->getNbLevel(); 46 int nbLvl = this->getNbLevel(); 42 47 sendRank_.resize(nbLvl); 43 48 recvRank_.resize(nbLvl); 44 49 Index2VectorInfoTypeMap indexToVecInfoMap; 45 indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor())); 46 typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end(); 47 for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second); 48 computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1); 50 indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor())); 51 typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end(); 52 for (; it != ite; ++it) 53 { 54 indexToVecInfoMap[it->first].push_back(it->second); 55 } 56 57 computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1); 49 58 } 50 59 … … 179 188 { 180 189 if (0 != recvNbIndexClientCount[idx]) 190 { 181 191 recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request); 192 } 182 193 currentIndex += recvNbIndexClientCount[idx]; 183 194 } … … 189 200 190 201 std::vector<MPI_Status> status(request.size()); 202 203 //printf("1(%d): calling wait all for %lu requests\n", clientRank, request.size()); 204 191 205 MPI_Waitall(request.size(), &request[0], &status[0]); 206 207 208 //printf(" 1(%d): calling wait all for %lu requests OK\n", clientRank, request.size()); 192 209 193 210 CArray<size_t,1>* tmpGlobalIndex; … … 294 311 295 312 std::vector<MPI_Status> statusOnReturn(requestOnReturn.size()); 313 //printf("2(%d): calling wait all for %lu requests\n", clientRank, requestOnReturn.size()); 296 314 MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]); 315 316 //printf(" 2(%d): calling wait all for %lu requests OK\n", clientRank, requestOnReturn.size()); 297 317 298 318 Index2VectorInfoTypeMap indexToInfoMapping; … … 363 383 int level) 364 384 { 385 //printf("in computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap, const MPI_Comm& commLevel, int level)\n"); 365 386 int clientRank; 366 387 MPI_Comm_rank(commLevel,&clientRank); … … 418 439 } 419 440 441 //printf("check 4 OK. clientRank = %d\n", clientRank); 442 420 443 // Calculate from how many clients each client receive message. 421 444 // Calculate size of buffer for receiving message … … 423 446 sendRecvRank(level, sendBuff, sendNbIndexBuff, 424 447 recvRankClient, recvNbIndexClientCount); 448 //printf("sendRecvRank OK\n"); 425 449 426 450 int recvNbIndexCount = 0; … … 435 459 recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()]; 436 460 } 461 462 //printf("check 5 OK. clientRank = %d\n", clientRank); 437 463 438 464 // If a client holds information about index and the corresponding which don't belong to it, … … 447 473 { 448 474 recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request); 475 449 476 recvInfoFromClients(recvRankClient[idx], 450 477 recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 451 478 recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(), 452 479 commLevel, request); 480 453 481 } 454 482 currentIndex += recvNbIndexClientCount[idx]; 455 483 } 484 485 //printf("check 6 OK. clientRank = %d\n", clientRank); 456 486 457 487 boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex, 458 488 iteIndex = client2ClientIndex.end(); 459 489 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) 490 { 460 491 sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request); 492 493 } 494 495 //printf("check 7 OK. clientRank = %d\n", clientRank); 496 461 497 boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo, 462 498 iteInfo = client2ClientInfo.end(); 463 499 for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) 500 { 464 501 sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request); 465 502 503 } 504 505 //printf("check 8 OK. clientRank = %d\n", clientRank); 466 506 std::vector<MPI_Status> status(request.size()); 507 467 508 MPI_Waitall(request.size(), &request[0], &status[0]); 468 509 … … 480 521 { 481 522 ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex); 523 unsigned long pp = *(recvIndexBuff+currentIndex+i); 524 482 525 indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue); 483 526 } 484 527 currentIndex += count; 485 528 } 529 530 //printf("check 9 OK. clientRank = %d\n", clientRank); 486 531 487 532 if (0 != recvNbIndexCount) … … 498 543 delete [] it->second; 499 544 545 //printf("check 10 OK. clientRank = %d\n", clientRank); 500 546 // Ok, now do something recursive 501 547 if (0 < level) … … 523 569 MPI_Request request; 524 570 requestSendIndex.push_back(request); 571 525 572 MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG, 526 573 clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back())); … … 541 588 MPI_Request request; 542 589 requestRecvIndex.push_back(request); 590 543 591 MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG, 544 592 clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back())); … … 560 608 MPI_Request request; 561 609 requestSendInfo.push_back(request); 562 610 //printf("MPI_IsendInfo(info, infoSize, MPI_CHAR,... char count = %d, dest = %d, buf_size = %d\n", infoSize, clientDestRank, sizeof(*info) ); 563 611 MPI_Isend(info, infoSize, MPI_CHAR, 564 612 clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back())); … … 668 716 ++nRequest; 669 717 } 670 718 719 int clientRank; 720 MPI_Comm_rank(this->internalComm_,&clientRank); 721 //printf("4(%d): calling wait all for %lu requests\n", clientRank, sendNbRank.size()+recvNbRank.size()); 671 722 MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]); 723 //printf(" 4(%d): calling wait all for %lu requests OK\n", clientRank, sendNbRank.size()+recvNbRank.size()); 672 724 } 673 725 … … 686 738 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements) 687 739 { 740 int myRank; 741 MPI_Comm_rank(MPI_COMM_WORLD, &myRank); 742 //printf("myRank = %d, in sendRecvRank(int level, const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements, std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)\n", myRank); 688 743 int groupBegin = this->getGroupBegin()[level]; 689 744 … … 702 757 for (int idx = 0; idx < recvBuffSize; ++idx) 703 758 { 759 //printf("myRank = %d starts irecv with src = %d, tag = %d, idx = %d\n", myRank, recvRank[idx], MPI_DHT_INDEX_0, idx); 704 760 MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT, 705 761 recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]); 762 //printf("myRank = %d MPI_Irecv OK, idx = %d, nRequest = %d\n", myRank, idx, nRequest); 706 763 ++nRequest; 707 764 } 765 766 //printf("myRank = %d, check 1 OK\n", myRank); 708 767 709 768 for (int idx = 0; idx < sendBuffSize; ++idx) … … 716 775 for (int idx = 0; idx < sendBuffSize; ++idx) 717 776 { 777 //printf("myRank = %d starts isend with dest = %d, tag = %d, idx = %d\n", myRank, sendRank[idx], MPI_DHT_INDEX_0, idx); 718 778 MPI_Isend(&sendBuff[idx*2], 2, MPI_INT, 719 779 sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]); 780 //printf("myRank = %d MPI_Isend OK, idx = %d, nRequest = %d\n", myRank, idx, nRequest); 720 781 ++nRequest; 721 782 } 722 783 784 MPI_Barrier(this->internalComm_); 785 786 //printf("myRank = %d, check 2 OK\n", myRank); 787 788 int clientRank; 789 MPI_Comm_rank(this->internalComm_,&clientRank); 790 791 //printf("5(%d): calling wait all for %lu requests\n", myRank, sendBuffSize+recvBuffSize); 723 792 MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]); 793 //printf(" 5(%d): calling wait all for %lu requests OK\n", myRank, sendBuffSize+recvBuffSize); 794 //printf("check 3 OK\n"); 795 724 796 int nbRecvRank = 0, nbRecvElements = 0; 725 797 recvNbRank.clear(); … … 733 805 } 734 806 } 735 } 736 737 } 807 //printf("check 4 OK\n"); 808 } 809 810 }
Note: See TracChangeset
for help on using the changeset viewer.