Changeset 839
- Timestamp:
- 04/15/16 14:27:42 (8 years ago)
- Location:
- XIOS/trunk/src
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/trunk/src/client_client_dht_template.hpp
r835 r839 3 3 \author Ha NGUYEN 4 4 \since 01 Oct 2015 5 \date 06 Oct 20155 \date 15 April 2016 6 6 7 7 \brief Distributed hashed table implementation. … … 64 64 void sendRecvRank(int level, 65 65 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements, 66 int& recvNbRank, int& recvNbElements);66 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements); 67 67 68 68 protected: 69 void probeIndexMessageFromClients(unsigned long* recvIndexGlobalBuff,70 const int recvNbIndexCount,71 int& countIndexGlobal,72 std::map<int, unsigned long*>& indexGlobalBuffBegin,73 std::map<int, MPI_Request>& requestRecvIndexGlobal,74 const MPI_Comm& intraComm);75 76 void probeInfoMessageFromClients(unsigned char* recvIndexServerBuff,77 const int recvNbIndexCount,78 int& countIndexServer,79 std::map<int, unsigned char*>& infoBuffBegin,80 std::map<int, MPI_Request>& requestRecvIndexServer,81 const MPI_Comm& intraComm);82 83 69 // Send information to clients 84 70 void sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize, 85 const MPI_Comm& clientIntraComm, std::list<MPI_Request>& requestSendIndexServer); 71 const MPI_Comm& clientIntraComm, 72 std::vector<MPI_Request>& requestSendInfo); 73 74 void recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize, 75 const MPI_Comm& clientIntraComm, 76 std::vector<MPI_Request>& requestRecvInfo); 86 77 87 78 // Send global index to clients 88 79 void sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize, 89 const MPI_Comm& clientIntraComm, std::list<MPI_Request>& requestSendIndexGlobal); 80 const MPI_Comm& clientIntraComm, 81 std::vector<MPI_Request>& requestSendIndexGlobal); 90 82 91 // Verify sending request 92 void testSendRequest(std::list<MPI_Request>& sendRequest); 83 void recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize, 84 const MPI_Comm& clientIntraComm, 85 std::vector<MPI_Request>& requestRecvIndex); 93 86 94 // Compute size of receiving buffer for global index 95 int computeBuffCountIndex(MPI_Request& requestRecv); 96 97 // Compute size of receiving buffer for server index 98 int computeBuffCountInfo(MPI_Request& requestRecv); 87 void sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements, 88 const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements); 99 89 100 90 protected: -
XIOS/trunk/src/client_client_dht_template_impl.hpp
r835 r839 108 108 int indexClient = std::distance(itbClientHash, itClientHash)-1; 109 109 { 110 client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index; ;110 client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index; 111 111 ++sendNbIndexBuff[indexClient]; 112 112 } … … 114 114 } 115 115 116 int recvNbClient, recvNbIndexCount;116 std::vector<int> recvRankClient, recvNbIndexClientCount; 117 117 sendRecvRank(level, sendBuff, sendNbIndexBuff, 118 recvNbClient, recvNbIndexCount); 118 recvRankClient, recvNbIndexClientCount); 119 120 int recvNbIndexCount = 0; 121 for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx) 122 recvNbIndexCount += recvNbIndexClientCount[idx]; 123 124 unsigned long* recvIndexBuff; 125 if (0 != recvNbIndexCount) 126 recvIndexBuff = new unsigned long[recvNbIndexCount]; 127 128 std::vector<MPI_Request> request; 129 std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex, 130 iteRecvIndex = recvRankClient.end(), 131 itbRecvNbIndex = recvNbIndexClientCount.begin(), 132 itRecvNbIndex; 133 int currentIndex = 0; 134 int nbRecvClient = recvRankClient.size(); 135 for (int idx = 0; idx < nbRecvClient; ++idx) 136 { 137 if (0 != recvNbIndexClientCount[idx]) 138 recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request); 139 currentIndex += recvNbIndexClientCount[idx]; 140 } 119 141 120 142 std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex, 121 iteIndex = client2ClientIndex.end(); 122 123 std::list<MPI_Request> sendIndexRequest; 143 iteIndex = client2ClientIndex.end(); 124 144 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) 125 sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendIndexRequest); 126 127 int nbDemandingClient = recvNbClient; //recvBuff[clientRank], 128 int nbSendBuffInfoReceived = 0; 129 130 // Receiving demand as well as the responds from other clients 131 // The demand message contains global index; meanwhile the responds have server index information 132 // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s) 133 // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer 134 unsigned long* recvBuffIndex = 0; 135 int maxNbIndexDemandedFromOthers = recvNbIndexCount; 136 if (0 != maxNbIndexDemandedFromOthers) 137 recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers]; 138 139 // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients 140 unsigned char* recvBuffInfo = 0; 141 int nbIndexReceivedFromOthers = nbIndexToSend; 142 if (0 != nbIndexReceivedFromOthers) 143 recvBuffInfo = new unsigned char[nbIndexReceivedFromOthers*ProcessDHTElement<InfoType>::typeSize()]; 144 145 std::map<int, MPI_Request>::iterator itRequest; 146 std::vector<int> demandAlreadyReceived, repondAlreadyReceived; 147 148 int countIndex = 0; // Counting of buffer for receiving index 149 std::map<int, MPI_Request> requestRecvIndex; // Request returned by MPI_IRecv function about index 150 151 // Mapping client rank and the beginning position of receiving buffer for message of index from this client 152 std::map<int, unsigned long*> indexBuffBegin; 153 154 std::map<int,std::vector<size_t> > src2Index; // Temporary mapping contains info of demand (source and associate index) in curren level 155 156 CArray<size_t,1> tmpGlobalIndexOnClient(maxNbIndexDemandedFromOthers); 157 158 int k = 0; 159 while ((0 < nbDemandingClient) || (!sendIndexRequest.empty())) 160 { 161 // Just check whether a client has any demand from other clients. 162 // If it has, then it should send responds to these client(s) 163 probeIndexMessageFromClients(recvBuffIndex, maxNbIndexDemandedFromOthers, 164 countIndex, indexBuffBegin, 165 requestRecvIndex, commLevel); 166 if (0 < nbDemandingClient) 167 { 168 for (itRequest = requestRecvIndex.begin(); 169 itRequest != requestRecvIndex.end(); ++itRequest) 145 sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request); 146 147 std::vector<MPI_Status> status(request.size()); 148 MPI_Waitall(request.size(), &request[0], &status[0]); 149 150 CArray<size_t,1>* tmpGlobalIndex; 151 if (0 != recvNbIndexCount) 152 tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData); 153 else 154 tmpGlobalIndex = new CArray<size_t,1>(); 155 156 // OK, we go to the next level and do something recursive 157 if (0 < level) 158 { 159 --level; 160 computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level); 161 } 162 else // Now, we are in the last level where necessary mappings are. 163 indexToInfoMappingLevel_= (index2InfoMapping_); 164 165 typename Index2InfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap; 166 std::vector<int> sendNbIndexOnReturn(nbRecvClient,0); 167 currentIndex = 0; 168 for (int idx = 0; idx < nbRecvClient; ++idx) 169 { 170 for (int i = 0; i < recvNbIndexClientCount[idx]; ++i) 171 { 172 itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i)); 173 if (iteIndexToInfoMap != itIndexToInfoMap) ++sendNbIndexOnReturn[idx]; 174 } 175 currentIndex += recvNbIndexClientCount[idx]; 176 } 177 178 std::vector<int> recvRankOnReturn(client2ClientIndex.size()); 179 std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0); 180 int indexIndex = 0; 181 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex) 182 { 183 recvRankOnReturn[indexIndex] = itIndex->first; 184 } 185 sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn, 186 recvRankOnReturn, recvNbIndexOnReturn); 187 188 int recvNbIndexCountOnReturn = 0; 189 for (int idx = 0; idx < recvRankOnReturn.size(); ++idx) 190 recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx]; 191 192 unsigned long* recvIndexBuffOnReturn; 193 unsigned char* recvInfoBuffOnReturn; 194 if (0 != recvNbIndexCountOnReturn) 195 { 196 recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn]; 197 recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()]; 198 } 199 200 std::vector<MPI_Request> requestOnReturn; 201 currentIndex = 0; 202 for (int idx = 0; idx < recvRankOnReturn.size(); ++idx) 203 { 204 if (0 != recvNbIndexOnReturn[idx]) 205 { 206 recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn); 207 recvInfoFromClients(recvRankOnReturn[idx], 208 recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 209 recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), 210 commLevel, requestOnReturn); 211 } 212 currentIndex += recvNbIndexOnReturn[idx]; 213 } 214 215 std::map<int,unsigned char*> client2ClientInfoOnReturn; 216 std::map<int,size_t*> client2ClientIndexOnReturn; 217 currentIndex = 0; 218 for (int idx = 0; idx < nbRecvClient; ++idx) 219 { 220 if (0 != sendNbIndexOnReturn[idx]) 221 { 222 int rank = recvRankClient[idx]; 223 client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]]; 224 client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()]; 225 unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank]; 226 int infoIndex = 0; 227 int nb = 0; 228 for (int i = 0; i < recvNbIndexClientCount[idx]; ++i) 170 229 { 171 int flagIndexGlobal, count; 172 MPI_Status statusIndexGlobal; 173 174 MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal); 175 if (true == flagIndexGlobal) 230 itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i)); 231 if (iteIndexToInfoMap != itIndexToInfoMap) 176 232 { 177 MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count); 178 int clientSourceRank = statusIndexGlobal.MPI_SOURCE; 179 unsigned long* beginBuff = indexBuffBegin[clientSourceRank]; 180 for (int i = 0; i < count; ++i) 181 { 182 src2Index[clientSourceRank].push_back(*(beginBuff+i)); 183 tmpGlobalIndexOnClient(k) = *(beginBuff+i); 184 ++k; 185 } 186 --nbDemandingClient; 187 188 demandAlreadyReceived.push_back(clientSourceRank); 233 client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first; 234 ProcessDHTElement<InfoType>::packElement(itIndexToInfoMap->second, tmpInfoPtr, infoIndex); 235 ++nb; 189 236 } 190 237 } 191 for (int i = 0; i< demandAlreadyReceived.size(); ++i) 192 requestRecvIndex.erase(demandAlreadyReceived[i]); 193 } 194 195 testSendRequest(sendIndexRequest); 196 } 197 198 if (0 < level) 199 { 200 --level; 201 computeIndexInfoMappingLevel(tmpGlobalIndexOnClient, this->internalComm_, level); 202 } 203 else 204 indexToInfoMappingLevel_ = index2InfoMapping_; 205 206 std::map<int, std::vector<InfoType> > client2ClientInfo; 207 std::vector<unsigned char*> infoToSend(src2Index.size()); 208 std::list<MPI_Request> sendInfoRequest; 209 std::map<int, std::vector<size_t> >::iterator itSrc2Idx = src2Index.begin(), 210 iteSrc2Idx = src2Index.end(); 211 for (int i=0; itSrc2Idx != iteSrc2Idx; ++itSrc2Idx, ++i) 212 { 213 int clientSourceRank = itSrc2Idx->first; 214 std::vector<size_t>& srcIdx = itSrc2Idx->second; 215 infoToSend[i] = new unsigned char [srcIdx.size()*ProcessDHTElement<InfoType>::typeSize()]; 216 int infoIndex = 0; 217 for (int idx = 0; idx < srcIdx.size(); ++idx) 218 { 219 ProcessDHTElement<InfoType>::packElement(indexToInfoMappingLevel_[srcIdx[idx]], infoToSend[i], infoIndex); 220 } 221 sendInfoToClients(clientSourceRank, infoToSend[i], infoIndex, commLevel, sendInfoRequest); 222 } 238 239 sendIndexToClients(rank, client2ClientIndexOnReturn[rank], 240 sendNbIndexOnReturn[idx], commLevel, requestOnReturn); 241 sendInfoToClients(rank, client2ClientInfoOnReturn[rank], 242 sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn); 243 } 244 currentIndex += recvNbIndexClientCount[idx]; 245 } 246 247 std::vector<MPI_Status> statusOnReturn(requestOnReturn.size()); 248 MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]); 223 249 224 250 boost::unordered_map<size_t,InfoType> indexToInfoMapping; 225 int countInfo = 0; // Counting of buffer for receiving server index 226 std::map<int, MPI_Request> requestRecvInfo; 227 228 // Mapping client rank and the begining position of receiving buffer for message of server index from this client 229 std::map<int, unsigned char*> infoBuffBegin; 230 231 while ((!sendInfoRequest.empty()) || (nbSendBuffInfoReceived < nbIndexReceivedFromOthers)) 232 { 233 testSendRequest(sendInfoRequest); 234 235 // In some cases, a client need to listen respond from other clients about server information 236 // Ok, with the information, a client can fill in its server-global index map. 237 probeInfoMessageFromClients(recvBuffInfo, nbIndexReceivedFromOthers, 238 countInfo, infoBuffBegin, 239 requestRecvInfo, commLevel); 240 for (itRequest = requestRecvInfo.begin(); 241 itRequest != requestRecvInfo.end(); 242 ++itRequest) 243 { 244 int flagInfo, count; 245 MPI_Status statusInfo; 246 247 MPI_Test(&(itRequest->second), &flagInfo, &statusInfo); 248 if (true == flagInfo) 249 { 250 MPI_Get_count(&statusInfo, MPI_CHAR, &count); 251 int actualCountInfo = count/infoTypeSize; 252 int clientSourceRank = statusInfo.MPI_SOURCE; 253 unsigned char* beginBuff = infoBuffBegin[clientSourceRank]; 254 size_t* indexTmp = client2ClientIndex[clientSourceRank]; 255 int infoIndex = 0; 256 for (int i = 0; i < actualCountInfo; ++i) 257 { 258 ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[indexTmp[i]], beginBuff, infoIndex); 259 } 260 nbSendBuffInfoReceived += actualCountInfo; 261 repondAlreadyReceived.push_back(clientSourceRank); 262 } 263 } 264 265 for (int i = 0; i< repondAlreadyReceived.size(); ++i) 266 requestRecvInfo.erase(repondAlreadyReceived[i]); 267 repondAlreadyReceived.resize(0); 268 } 269 270 indexToInfoMappingLevel_.swap(indexToInfoMapping); 271 if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex; 272 if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo; 273 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second; 274 for (int idx = 0; idx < infoToSend.size(); ++idx) delete [] infoToSend[idx]; 251 int infoIndex = 0; 252 for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx) 253 { 254 ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[recvIndexBuffOnReturn[idx]], recvInfoBuffOnReturn, infoIndex); 255 } 256 257 indexToInfoMappingLevel_ = (indexToInfoMapping); 258 if (0 != recvNbIndexCount) delete [] recvIndexBuff; 259 for (std::map<int,size_t*>::const_iterator it = client2ClientIndex.begin(); 260 it != client2ClientIndex.end(); ++it) 261 delete [] it->second; 262 delete tmpGlobalIndex; 263 264 if (0 != recvNbIndexCountOnReturn) 265 { 266 delete [] recvIndexBuffOnReturn; 267 delete [] recvInfoBuffOnReturn; 268 } 269 270 for (std::map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin(); 271 it != client2ClientInfoOnReturn.end(); ++it) 272 delete [] it->second; 273 274 for (std::map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin(); 275 it != client2ClientIndexOnReturn.end(); ++it) 276 delete [] it->second; 275 277 } 276 278 … … 297 299 /*! 298 300 Compute distribution of global index for servers 299 Each client already holds a piece of information and its attached index. 300 This information will be redistributed among processes by projecting indices into size_t space. 301 Each client already holds a piece of information and its associated index. 302 This information will be redistributed among processes by projecting indices into size_t space, 303 the corresponding information will be also distributed on size_t space. 301 304 After the redistribution, each client holds rearranged index and its corresponding information. 302 305 \param [in] indexInfoMap index and its corresponding info (usually server index) … … 369 372 // Calculate from how many clients each client receive message. 370 373 // Calculate size of buffer for receiving message 371 int recvNbClient, recvNbIndexCount;374 std::vector<int> recvRankClient, recvNbIndexClientCount; 372 375 sendRecvRank(level, sendBuff, sendNbIndexBuff, 373 recvNbClient, recvNbIndexCount); 376 recvRankClient, recvNbIndexClientCount); 377 378 int recvNbIndexCount = 0; 379 for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx) 380 recvNbIndexCount += recvNbIndexClientCount[idx]; 381 382 unsigned long* recvIndexBuff; 383 unsigned char* recvInfoBuff; 384 if (0 != recvNbIndexCount) 385 { 386 recvIndexBuff = new unsigned long[recvNbIndexCount]; 387 recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()]; 388 } 374 389 375 390 // If a client holds information about index and the corresponding which don't belong to it, 376 391 // it will send a message to the correct clients. 377 392 // Contents of the message are index and its corresponding informatioin 378 std::list<MPI_Request> sendRequest; 393 std::vector<MPI_Request> request; 394 int currentIndex = 0; 395 int nbRecvClient = recvRankClient.size(); 396 for (int idx = 0; idx < nbRecvClient; ++idx) 397 { 398 if (0 != recvNbIndexClientCount[idx]) 399 { 400 recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request); 401 recvInfoFromClients(recvRankClient[idx], 402 recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 403 recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(), 404 commLevel, request); 405 } 406 currentIndex += recvNbIndexClientCount[idx]; 407 } 408 379 409 std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex, 380 410 iteIndex = client2ClientIndex.end(); 381 411 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) 382 sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendRequest);412 sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request); 383 413 std::map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo, 384 414 iteInfo = client2ClientInfo.end(); 385 415 for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) 386 sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, sendRequest); 387 388 389 unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount]; 390 unsigned char* recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()]; 391 392 std::map<int, MPI_Request>::iterator itRequestIndex, itRequestInfo; 393 std::map<int, int> countBuffInfo, countBuffIndex; 394 std::vector<int> processedList; 395 396 bool isFinished = (0 == recvNbClient) ? true : false; 397 398 // Counting of buffer for receiving global index 399 int countIndex = 0; 400 401 // Counting of buffer for receiving server index 402 int countInfo = 0; 403 404 // Request returned by MPI_IRecv function about global index 405 std::map<int, MPI_Request> requestRecvIndex, requestRecvInfo; 406 407 // Mapping client rank and the beginning position of receiving buffer for message of global index from this client 408 std::map<int, unsigned long*> indexBuffBegin; 409 410 // Mapping client rank and the begining position of receiving buffer for message of server index from this client 411 std::map<int, unsigned char*> infoBuffBegin; 416 sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request); 417 418 std::vector<MPI_Status> status(request.size()); 419 MPI_Waitall(request.size(), &request[0], &status[0]); 412 420 413 421 boost::unordered_map<size_t,InfoType> indexToInfoMapping; 414 415 // Now each client trys to listen to demand from others. 416 // If they have message, it processes: pushing global index and corresponding server to its map 417 while (!isFinished || (!sendRequest.empty())) 418 { 419 testSendRequest(sendRequest); 420 probeIndexMessageFromClients(recvIndexBuff, recvNbIndexCount, 421 countIndex, indexBuffBegin, 422 requestRecvIndex, commLevel); 423 // Processing complete request 424 for (itRequestIndex = requestRecvIndex.begin(); 425 itRequestIndex != requestRecvIndex.end(); 426 ++itRequestIndex) 427 { 428 int rank = itRequestIndex->first; 429 int count = computeBuffCountIndex(itRequestIndex->second); 430 if (0 != count) 431 countBuffIndex[rank] = count; 432 } 433 434 probeInfoMessageFromClients(recvInfoBuff, recvNbIndexCount, 435 countInfo, infoBuffBegin, 436 requestRecvInfo, commLevel); 437 for (itRequestInfo = requestRecvInfo.begin(); 438 itRequestInfo != requestRecvInfo.end(); 439 ++itRequestInfo) 440 { 441 int rank = itRequestInfo->first; 442 int count = computeBuffCountInfo(itRequestInfo->second); 443 if (0 != count) 444 countBuffInfo[rank] = count; 445 } 446 447 for (std::map<int, int>::iterator it = countBuffIndex.begin(); 448 it != countBuffIndex.end(); ++it) 449 { 450 int rank = it->first; 451 if ((countBuffInfo.end() != countBuffInfo.find(rank)) && 452 (countBuffIndex.end() != countBuffIndex.find(rank))) 453 { 454 int count = it->second; 455 InfoType infoValue; 456 int infoIndex = 0; 457 for (int i = 0; i < count; ++i) 458 { 459 ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuffBegin[rank], infoIndex); 460 indexToInfoMapping.insert(std::make_pair<size_t,InfoType>(*(indexBuffBegin[rank]+i),infoValue)); 461 } 462 463 processedList.push_back(rank); 464 --recvNbClient; 465 } 466 } 467 468 for (int i = 0; i < processedList.size(); ++i) 469 { 470 requestRecvInfo.erase(processedList[i]); 471 requestRecvIndex.erase(processedList[i]); 472 countBuffIndex.erase(processedList[i]); 473 countBuffInfo.erase(processedList[i]); 474 } 475 476 if (0 == recvNbClient) isFinished = true; 477 } 478 479 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second; 480 for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) delete [] itInfo->second; 481 delete [] recvIndexBuff; 482 delete [] recvInfoBuff; 422 currentIndex = 0; 423 InfoType infoValue; 424 int infoIndex = 0; 425 unsigned char* infoBuff = recvInfoBuff; 426 for (int idx = 0; idx < nbRecvClient; ++idx) 427 { 428 int count = recvNbIndexClientCount[idx]; 429 for (int i = 0; i < count; ++i) 430 { 431 ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex); 432 indexToInfoMapping[*(recvIndexBuff+currentIndex+i)] = infoValue; 433 } 434 currentIndex += count; 435 } 436 437 if (0 != recvNbIndexCount) 438 { 439 delete [] recvIndexBuff; 440 delete [] recvInfoBuff; 441 } 442 for (std::map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin(); 443 it != client2ClientInfo.end(); ++it) 444 delete [] it->second; 445 446 for (std::map<int,size_t*>::const_iterator it = client2ClientIndex.begin(); 447 it != client2ClientIndex.end(); ++it) 448 delete [] it->second; 483 449 484 450 // Ok, now do something recursive … … 489 455 } 490 456 else 491 index2InfoMapping_ = indexToInfoMapping; 492 } 493 494 /*! 495 Probe and receive message containg global index from other clients. 496 Each client can send a message of global index to other clients to fulfill their maps. 497 Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer 498 \param [in] recvIndexBuff buffer dedicated for receiving global index 499 \param [in] recvNbIndexCount size of the buffer 500 \param [in] countIndex number of received index 501 \param [in] indexBuffBegin beginning of index buffer for each source rank 502 \param [in] requestRecvIndex request of receving index 503 \param [in] intraComm communicator 504 */ 505 template<typename T, typename H> 506 void CClientClientDHTTemplate<T,H>::probeIndexMessageFromClients(unsigned long* recvIndexBuff, 507 const int recvNbIndexCount, 508 int& countIndex, 509 std::map<int, unsigned long*>& indexBuffBegin, 510 std::map<int, MPI_Request>& requestRecvIndex, 511 const MPI_Comm& intraComm) 512 { 513 MPI_Status statusIndexGlobal; 514 int flagIndexGlobal, count; 515 516 // Probing for global index 517 MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX, intraComm, &flagIndexGlobal, &statusIndexGlobal); 518 if ((true == flagIndexGlobal) && (countIndex < recvNbIndexCount)) 519 { 520 MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count); 521 indexBuffBegin.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexBuff+countIndex)); 522 MPI_Irecv(recvIndexBuff+countIndex, count, MPI_UNSIGNED_LONG, 523 statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX, intraComm, 524 &requestRecvIndex[statusIndexGlobal.MPI_SOURCE]); 525 countIndex += count; 526 } 527 } 528 529 /*! 530 Probe and receive message containg server index from other clients. 531 Each client can send a message of server index to other clients to fulfill their maps. 532 Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer 533 \param [in] recvInfoBuff buffer dedicated for receiving server index 534 \param [in] recvNbIndexCount size of the buffer 535 \param [in] countInfo number of received info 536 \param [in] infoBuffBegin beginning of index buffer for each source rank 537 \param [in] requestRecvInfo request of receving index 538 \param [in] intraComm communicator 539 */ 540 template<typename T, typename H> 541 void CClientClientDHTTemplate<T,H>::probeInfoMessageFromClients(unsigned char* recvInfoBuff, 542 const int recvNbIndexCount, 543 int& countInfo, 544 std::map<int, unsigned char*>& infoBuffBegin, 545 std::map<int, MPI_Request>& requestRecvInfo, 546 const MPI_Comm& intraComm) 547 { 548 MPI_Status statusInfo; 549 int flagInfo, count; 550 551 // Probing for server index 552 MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO, intraComm, &flagInfo, &statusInfo); 553 if ((true == flagInfo) && (countInfo < recvNbIndexCount)) 554 { 555 MPI_Get_count(&statusInfo, MPI_CHAR, &count); 556 unsigned char* beginInfoBuff = recvInfoBuff+countInfo*infoTypeSize; 557 infoBuffBegin.insert(std::make_pair<int, unsigned char*>(statusInfo.MPI_SOURCE, beginInfoBuff)); 558 MPI_Irecv(beginInfoBuff, count, MPI_CHAR, 559 statusInfo.MPI_SOURCE, MPI_DHT_INFO, intraComm, 560 &requestRecvInfo[statusInfo.MPI_SOURCE]); 561 562 countInfo += count/infoTypeSize; 563 } 457 index2InfoMapping_ = (indexToInfoMapping); 564 458 } 565 459 … … 568 462 \param [in] clientDestRank rank of destination client 569 463 \param [in] indices index to send 464 \param [in] indiceSize size of index array to send 570 465 \param [in] clientIntraComm communication group of client 571 466 \param [in] requestSendIndex list of sending request … … 574 469 void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize, 575 470 const MPI_Comm& clientIntraComm, 576 std:: list<MPI_Request>& requestSendIndex)471 std::vector<MPI_Request>& requestSendIndex) 577 472 { 578 473 MPI_Request request; … … 583 478 584 479 /*! 480 Receive message containing index to clients 481 \param [in] clientDestRank rank of destination client 482 \param [in] indices index to send 483 \param [in] clientIntraComm communication group of client 484 \param [in] requestRecvIndex list of receiving request 485 */ 486 template<typename T, typename H> 487 void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize, 488 const MPI_Comm& clientIntraComm, 489 std::vector<MPI_Request>& requestRecvIndex) 490 { 491 MPI_Request request; 492 requestRecvIndex.push_back(request); 493 MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG, 494 clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back())); 495 } 496 497 /*! 585 498 Send message containing information to clients 586 499 \param [in] clientDestRank rank of destination client 587 \param [in] info server index to send 500 \param [in] info info array to send 501 \param [in] infoSize info array size to send 588 502 \param [in] clientIntraComm communication group of client 589 503 \param [in] requestSendInfo list of sending request … … 591 505 template<typename T, typename H> 592 506 void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize, 593 const MPI_Comm& clientIntraComm, std::list<MPI_Request>& requestSendInfo) 507 const MPI_Comm& clientIntraComm, 508 std::vector<MPI_Request>& requestSendInfo) 594 509 { 595 510 MPI_Request request; … … 601 516 602 517 /*! 603 Verify status of sending request 604 \param [in] sendRequest sending request to verify 605 */ 606 template<typename T, typename H> 607 void CClientClientDHTTemplate<T,H>::testSendRequest(std::list<MPI_Request>& sendRequest) 608 { 609 int flag = 0; 610 MPI_Status status; 611 std::list<MPI_Request>::iterator itRequest; 612 int sizeListRequest = sendRequest.size(); 613 int idx = 0; 614 while (idx < sizeListRequest) 615 { 616 bool isErased = false; 617 for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest) 618 { 619 MPI_Test(&(*itRequest), &flag, &status); 620 if (true == flag) 621 { 622 isErased = true; 623 break; 624 } 625 } 626 if (true == isErased) sendRequest.erase(itRequest); 627 ++idx; 628 } 629 } 630 631 /*! 632 Compute size of message containing global index 633 \param[in] requestRecv request of message 634 */ 635 template<typename T, typename H> 636 int CClientClientDHTTemplate<T,H>::computeBuffCountIndex(MPI_Request& requestRecv) 637 { 638 int flag, count = 0; 639 MPI_Status status; 640 641 MPI_Test(&requestRecv, &flag, &status); 642 if (true == flag) 643 { 644 MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count); 645 } 646 647 return count; 648 } 649 650 /*! 651 Compute size of message containing server index 652 \param[in] requestRecv request of message 653 */ 654 template<typename T, typename H> 655 int CClientClientDHTTemplate<T,H>::computeBuffCountInfo(MPI_Request& requestRecv) 656 { 657 int flag, count = 0; 658 MPI_Status status; 659 660 MPI_Test(&requestRecv, &flag, &status); 661 if (true == flag) 662 { 663 MPI_Get_count(&status, MPI_CHAR, &count); 664 } 665 666 return (count/infoTypeSize); 667 } 668 669 /*! 670 Compute how many processes one process needs to send to and from how many processes it will receive 518 Receive message containing information from other clients 519 \param [in] clientDestRank rank of destination client 520 \param [in] info info array to receive 521 \param [in] infoSize info array size to receive 522 \param [in] clientIntraComm communication group of client 523 \param [in] requestRecvInfo list of sending request 524 */ 525 template<typename T, typename H> 526 void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize, 527 const MPI_Comm& clientIntraComm, 528 std::vector<MPI_Request>& requestRecvInfo) 529 { 530 MPI_Request request; 531 requestRecvInfo.push_back(request); 532 533 MPI_Irecv(info, infoSize, MPI_CHAR, 534 clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back())); 535 } 536 537 /*! 538 Compute how many processes one process needs to send to and from how many processes it will receive. 539 This computation is only based on hierachical structure of distributed hash table (e.x: number of processes) 671 540 */ 672 541 template<typename T, typename H> … … 721 590 722 591 /*! 592 Compute number of clients as well as corresponding number of elements each client will receive on returning searching result 593 \param [in] sendNbRank Rank of clients to send to 594 \param [in] sendNbElements Number of elements each client to send to 595 \param [in] receiveNbRank Rank of clients to receive from 596 \param [out] recvNbElements Number of elements each client to send to 597 */ 598 template<typename T, typename H> 599 void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements, 600 const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements) 601 { 602 recvNbElements.resize(recvNbRank.size()); 603 std::vector<MPI_Request> request(sendNbRank.size()+recvNbRank.size()); 604 std::vector<MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size()); 605 606 int nRequest = 0; 607 for (int idx = 0; idx < recvNbRank.size(); ++idx) 608 { 609 MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT, 610 recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]); 611 ++nRequest; 612 } 613 614 for (int idx = 0; idx < sendNbRank.size(); ++idx) 615 { 616 MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT, 617 sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]); 618 ++nRequest; 619 } 620 621 MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]); 622 } 623 624 /*! 723 625 Send and receive number of process each process need to listen to as well as number 724 of index it will receive 626 of index it will receive during the initalization phase 627 \param [in] level current level 628 \param [in] sendNbRank Rank of clients to send to 629 \param [in] sendNbElements Number of elements each client to send to 630 \param [out] receiveNbRank Rank of clients to receive from 631 \param [out] recvNbElements Number of elements each client to send to 725 632 */ 726 633 template<typename T, typename H> 727 634 void CClientClientDHTTemplate<T,H>::sendRecvRank(int level, 728 635 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements, 729 int& recvNbRank, int& recvNbElements)636 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements) 730 637 { 731 638 int groupBegin = this->getGroupBegin()[level]; … … 735 642 std::vector<int>& recvRank = recvRank_[level]; 736 643 int sendBuffSize = sendRank.size(); 737 int* sendBuff = new int [sendBuffSize*2]; 738 std::vector<MPI_Request> request(sendBuffSize); 739 std::vector<MPI_Status> requestStatus(sendBuffSize); 644 std::vector<int> sendBuff(sendBuffSize*2); 740 645 int recvBuffSize = recvRank.size(); 741 int* recvBuff = new int [2]; 646 std::vector<int> recvBuff(recvBuffSize*2,0); 647 648 std::vector<MPI_Request> request(sendBuffSize+recvBuffSize); 649 std::vector<MPI_Status> requestStatus(sendBuffSize+recvBuffSize); 650 651 int nRequest = 0; 652 for (int idx = 0; idx < recvBuffSize; ++idx) 653 { 654 MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT, 655 recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]); 656 ++nRequest; 657 } 742 658 743 659 for (int idx = 0; idx < sendBuffSize; ++idx) … … 751 667 { 752 668 MPI_Isend(&sendBuff[idx*2], 2, MPI_INT, 753 sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[idx]); 754 } 755 756 MPI_Status status; 669 sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]); 670 ++nRequest; 671 } 672 673 MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]); 757 674 int nbRecvRank = 0, nbRecvElements = 0; 675 recvNbRank.clear(); 676 recvNbElements.clear(); 758 677 for (int idx = 0; idx < recvBuffSize; ++idx) 759 678 { 760 MPI_Recv(recvBuff, 2, MPI_INT, 761 recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &status); 762 nbRecvRank += *(recvBuff); 763 nbRecvElements += *(recvBuff+1); 764 } 765 766 MPI_Waitall(sendBuffSize, &request[0], &requestStatus[0]); 767 768 recvNbRank = nbRecvRank; 769 recvNbElements = nbRecvElements; 770 771 delete [] sendBuff; 772 delete [] recvBuff; 773 } 774 775 } 679 if (0 != recvBuff[2*idx]) 680 { 681 recvNbRank.push_back(recvRank[idx]); 682 recvNbElements.push_back(recvBuff[2*idx+1]); 683 } 684 } 685 } 686 687 }
Note: See TracChangeset
for help on using the changeset viewer.