Changeset 829 for XIOS/trunk/src/client_server_mapping_distributed.cpp
- Timestamp:
- 03/23/16 16:11:01 (8 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/trunk/src/client_server_mapping_distributed.cpp
r721 r829 3 3 \author Ha NGUYEN 4 4 \since 27 Feb 2015 5 \date 06 Oct 20155 \date 16 Mars 2016 6 6 7 7 \brief Mapping between index client and server. … … 19 19 CClientServerMappingDistributed::CClientServerMappingDistributed(const boost::unordered_map<size_t,int>& globalIndexOfServer, 20 20 const MPI_Comm& clientIntraComm, bool isDataDistributed) 21 : CClientServerMapping(), indexClientHash_(), countIndexGlobal_(0), countIndexServer_(0), 22 indexGlobalBuffBegin_(), indexServerBuffBegin_(), requestRecvIndexServer_(), isDataDistributed_(isDataDistributed), 23 ccDHT_(0) 21 : CClientServerMapping(), ccDHT_(0) 24 22 { 25 clientIntraComm_ = clientIntraComm;26 MPI_Comm_size(clientIntraComm,&(nbClient_));27 MPI_Comm_rank(clientIntraComm,&clientRank_);28 computeHashIndex();29 30 23 ccDHT_ = new CClientClientDHTInt(globalIndexOfServer, 31 24 clientIntraComm, 32 25 isDataDistributed); 33 34 // computeDistributedServerIndex(globalIndexOfServer, clientIntraComm);35 26 } 36 27 … … 47 38 { 48 39 ccDHT_->computeIndexInfoMapping(globalIndexOnClient); 49 indexGlobalOnServer_ = (ccDHT_->getInfoIndexMap()); 50 51 /* 52 size_t ssize = globalIndexOnClient.numElements(), hashedIndex; 53 54 std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash, 55 iteClientHash = indexClientHash_.end(); 56 std::map<int, std::vector<size_t> > client2ClientIndexGlobal; 57 std::map<int, std::vector<int> > client2ClientIndexServer; 58 59 // Number of global index whose mapping server can be found out thanks to index-server mapping 60 int nbIndexAlreadyOnClient = 0; 61 62 // Number of global index whose mapping server are on other clients 63 int nbIndexSendToOthers = 0; 64 HashXIOS<size_t> hashGlobalIndex; 65 for (int i = 0; i < ssize; ++i) 66 { 67 size_t globalIndexClient = globalIndexOnClient(i); 68 hashedIndex = hashGlobalIndex(globalIndexClient); 69 itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedIndex); 70 if (iteClientHash != itClientHash) 71 { 72 int indexClient = std::distance(itbClientHash, itClientHash)-1; 73 { 74 client2ClientIndexGlobal[indexClient].push_back(globalIndexClient); 75 ++nbIndexSendToOthers; 76 } 77 } 78 } 79 80 int* sendBuff = new int[nbClient_]; 81 for (int i = 0; i < nbClient_; ++i) sendBuff[i] = 0; 82 std::map<int, std::vector<size_t> >::iterator itb = client2ClientIndexGlobal.begin(), it, 83 ite = client2ClientIndexGlobal.end(); 84 for (it = itb; it != ite; ++it) sendBuff[it->first] = 1; 85 int* recvBuff = new int[nbClient_]; 86 MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm_); 87 88 std::list<MPI_Request> sendRequest; 89 if (0 != nbIndexSendToOthers) 90 for (it = itb; it != ite; ++it) 91 sendIndexGlobalToClients(it->first, it->second, clientIntraComm_, sendRequest); 92 93 int nbDemandingClient = recvBuff[clientRank_], nbIndexServerReceived = 0; 94 95 // Receiving demand as well as the responds from other clients 96 // The demand message contains global index; meanwhile the responds have server index information 97 // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s) 98 // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer 99 for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size(); 100 MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm_); 101 102 unsigned long* recvBuffIndexGlobal = 0; 103 int maxNbIndexDemandedFromOthers = recvBuff[clientRank_]; 104 if (!isDataDistributed_) maxNbIndexDemandedFromOthers = nbDemandingClient * nbIndexSendToOthers; //globalIndexToServerMapping_.size(); // Not very optimal but it's general 105 106 if (0 != maxNbIndexDemandedFromOthers) 107 recvBuffIndexGlobal = new unsigned long[maxNbIndexDemandedFromOthers]; 108 109 // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients 110 int* recvBuffIndexServer = 0; 111 int nbIndexReceivedFromOthers = nbIndexSendToOthers; 112 if (0 != nbIndexReceivedFromOthers) 113 recvBuffIndexServer = new int[nbIndexReceivedFromOthers]; 114 115 std::map<int, MPI_Request>::iterator itRequest; 116 std::vector<int> demandAlreadyReceived, repondAlreadyReceived; 117 118 119 resetReceivingRequestAndCount(); 120 while ((0 < nbDemandingClient) || (!sendRequest.empty()) || 121 (nbIndexServerReceived < nbIndexReceivedFromOthers)) 122 { 123 // Just check whether a client has any demand from other clients. 124 // If it has, then it should send responds to these client(s) 125 probeIndexGlobalMessageFromClients(recvBuffIndexGlobal, maxNbIndexDemandedFromOthers); 126 if (0 < nbDemandingClient) 127 { 128 for (itRequest = requestRecvIndexGlobal_.begin(); 129 itRequest != requestRecvIndexGlobal_.end(); ++itRequest) 130 { 131 int flagIndexGlobal, count; 132 MPI_Status statusIndexGlobal; 133 134 MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal); 135 if (true == flagIndexGlobal) 136 { 137 MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count); 138 int clientSourceRank = statusIndexGlobal.MPI_SOURCE; 139 unsigned long* beginBuff = indexGlobalBuffBegin_[clientSourceRank]; 140 for (int i = 0; i < count; ++i) 141 { 142 client2ClientIndexServer[clientSourceRank].push_back(globalIndexToServerMapping_[*(beginBuff+i)]); 143 } 144 sendIndexServerToClients(clientSourceRank, client2ClientIndexServer[clientSourceRank], clientIntraComm_, sendRequest); 145 --nbDemandingClient; 146 147 demandAlreadyReceived.push_back(clientSourceRank); 148 } 149 } 150 for (int i = 0; i< demandAlreadyReceived.size(); ++i) 151 requestRecvIndexGlobal_.erase(demandAlreadyReceived[i]); 152 } 153 154 testSendRequest(sendRequest); 155 156 // In some cases, a client need to listen respond from other clients about server information 157 // Ok, with the information, a client can fill in its server-global index map. 158 probeIndexServerMessageFromClients(recvBuffIndexServer, nbIndexReceivedFromOthers); 159 for (itRequest = requestRecvIndexServer_.begin(); 160 itRequest != requestRecvIndexServer_.end(); 161 ++itRequest) 162 { 163 int flagIndexServer, count; 164 MPI_Status statusIndexServer; 165 166 MPI_Test(&(itRequest->second), &flagIndexServer, &statusIndexServer); 167 if (true == flagIndexServer) 168 { 169 MPI_Get_count(&statusIndexServer, MPI_INT, &count); 170 int clientSourceRank = statusIndexServer.MPI_SOURCE; 171 int* beginBuff = indexServerBuffBegin_[clientSourceRank]; 172 std::vector<size_t>& globalIndexTmp = client2ClientIndexGlobal[clientSourceRank]; 173 for (int i = 0; i < count; ++i) 174 { 175 (indexGlobalOnServer_[*(beginBuff+i)]).push_back(globalIndexTmp[i]); 176 } 177 nbIndexServerReceived += count; 178 repondAlreadyReceived.push_back(clientSourceRank); 179 } 180 } 181 182 for (int i = 0; i< repondAlreadyReceived.size(); ++i) 183 requestRecvIndexServer_.erase(repondAlreadyReceived[i]); 184 repondAlreadyReceived.resize(0); 185 } 186 187 if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndexGlobal; 188 if (0 != nbIndexReceivedFromOthers) delete [] recvBuffIndexServer; 189 delete [] sendBuff; 190 delete [] recvBuff; 191 */ 192 } 193 194 /*! 195 Compute the hash index distribution of whole size_t space then each client will have a range of this distribution 196 */ 197 void CClientServerMappingDistributed::computeHashIndex() 198 { 199 // Compute range of hash index for each client 200 indexClientHash_.resize(nbClient_+1); 201 size_t nbHashIndexMax = std::numeric_limits<size_t>::max(); 202 size_t nbHashIndex; 203 indexClientHash_[0] = 0; 204 for (int i = 1; i < nbClient_; ++i) 205 { 206 nbHashIndex = nbHashIndexMax / nbClient_; 207 if (i < (nbHashIndexMax%nbClient_)) ++nbHashIndex; 208 indexClientHash_[i] = indexClientHash_[i-1] + nbHashIndex; 209 } 210 indexClientHash_[nbClient_] = nbHashIndexMax; 211 } 212 213 /*! 214 Compute distribution of global index for servers 215 Each client already holds a piece of information about global index and the corresponding server. 216 This information is redistributed into size_t space in which each client possesses a specific range of index. 217 After the redistribution, each client as well as its range of index contains all necessary information about server. 218 \param [in] globalIndexOfServer global index and the corresponding server 219 \param [in] clientIntraComm client joining distribution process. 220 */ 221 void CClientServerMappingDistributed::computeDistributedServerIndex(const boost::unordered_map<size_t,int>& globalIndexOfServer, 222 const MPI_Comm& clientIntraComm) 223 { 224 int* sendBuff = new int[nbClient_]; 225 int* sendNbIndexBuff = new int[nbClient_]; 226 for (int i = 0; i < nbClient_; ++i) 227 { 228 sendBuff[i] = 0; sendNbIndexBuff[i] = 0; 229 } 230 231 // Compute size of sending and receving buffer 232 std::map<int, std::vector<size_t> > client2ClientIndexGlobal; 233 std::map<int, std::vector<int> > client2ClientIndexServer; 234 235 std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash, 236 iteClientHash = indexClientHash_.end(); 237 boost::unordered_map<size_t,int>::const_iterator it = globalIndexOfServer.begin(), 238 ite = globalIndexOfServer.end(); 239 HashXIOS<size_t> hashGlobalIndex; 40 const boost::unordered_map<size_t,int>& infoIndexMap = (ccDHT_->getInfoIndexMap()); 41 // indexGlobalOnServer_ = (ccDHT_->getInfoIndexMap()); 42 boost::unordered_map<size_t,int>::const_iterator it = infoIndexMap.begin(), ite = infoIndexMap.end(); 240 43 for (; it != ite; ++it) 241 44 { 242 size_t hashIndex = hashGlobalIndex(it->first); 243 itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex); 244 if (itClientHash != iteClientHash) 245 { 246 int indexClient = std::distance(itbClientHash, itClientHash)-1; 247 { 248 sendBuff[indexClient] = 1; 249 ++sendNbIndexBuff[indexClient]; 250 client2ClientIndexGlobal[indexClient].push_back(it->first); 251 client2ClientIndexServer[indexClient].push_back(it->second); 252 } 253 } 254 } 255 256 // Calculate from how many clients each client receive message. 257 int* recvBuff = new int[nbClient_]; 258 MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm); 259 int recvNbClient = recvBuff[clientRank_]; 260 261 // Calculate size of buffer for receiving message 262 int* recvNbIndexBuff = new int[nbClient_]; 263 MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm); 264 int recvNbIndexCount = recvNbIndexBuff[clientRank_]; 265 unsigned long* recvIndexGlobalBuff = new unsigned long[recvNbIndexCount]; 266 int* recvIndexServerBuff = new int[recvNbIndexCount]; 267 268 // If a client holds information about global index and servers which don't belong to it, 269 // it will send a message to the correct clients. 270 // Contents of the message are global index and its corresponding server index 271 std::list<MPI_Request> sendRequest; 272 std::map<int, std::vector<size_t> >::iterator itGlobal = client2ClientIndexGlobal.begin(), 273 iteGlobal = client2ClientIndexGlobal.end(); 274 for ( ;itGlobal != iteGlobal; ++itGlobal) 275 sendIndexGlobalToClients(itGlobal->first, itGlobal->second, clientIntraComm, sendRequest); 276 std::map<int, std::vector<int> >::iterator itServer = client2ClientIndexServer.begin(), 277 iteServer = client2ClientIndexServer.end(); 278 for (; itServer != iteServer; ++itServer) 279 sendIndexServerToClients(itServer->first, itServer->second, clientIntraComm, sendRequest); 280 281 std::map<int, MPI_Request>::iterator itRequestIndexGlobal, itRequestIndexServer; 282 std::map<int, int> countBuffIndexServer, countBuffIndexGlobal; 283 std::vector<int> processedList; 284 285 bool isFinished = (0 == recvNbClient) ? true : false; 286 287 // Just to make sure before listening message, all counting index and receiving request have already beeen reset 288 resetReceivingRequestAndCount(); 289 290 // Now each client trys to listen to demand from others. 291 // If they have message, it processes: pushing global index and corresponding server to its map 292 while (!isFinished || (!sendRequest.empty())) 293 { 294 testSendRequest(sendRequest); 295 probeIndexGlobalMessageFromClients(recvIndexGlobalBuff, recvNbIndexCount); 296 297 // Processing complete request 298 for (itRequestIndexGlobal = requestRecvIndexGlobal_.begin(); 299 itRequestIndexGlobal != requestRecvIndexGlobal_.end(); 300 ++itRequestIndexGlobal) 301 { 302 int rank = itRequestIndexGlobal->first; 303 int countIndexGlobal = computeBuffCountIndexGlobal(itRequestIndexGlobal->second); 304 if (0 != countIndexGlobal) 305 countBuffIndexGlobal[rank] = countIndexGlobal; 306 } 307 308 probeIndexServerMessageFromClients(recvIndexServerBuff, recvNbIndexCount); 309 for (itRequestIndexServer = requestRecvIndexServer_.begin(); 310 itRequestIndexServer != requestRecvIndexServer_.end(); 311 ++itRequestIndexServer) 312 { 313 int rank = itRequestIndexServer->first; 314 int countIndexServer = computeBuffCountIndexServer(itRequestIndexServer->second); 315 if (0 != countIndexServer) 316 countBuffIndexServer[rank] = countIndexServer; 317 } 318 319 for (std::map<int, int>::iterator it = countBuffIndexGlobal.begin(); 320 it != countBuffIndexGlobal.end(); ++it) 321 { 322 int rank = it->first; 323 if (countBuffIndexServer.end() != countBuffIndexServer.find(rank)) 324 { 325 processReceivedRequest(indexGlobalBuffBegin_[rank], indexServerBuffBegin_[rank], it->second); 326 processedList.push_back(rank); 327 --recvNbClient; 328 } 329 } 330 331 for (int i = 0; i < processedList.size(); ++i) 332 { 333 requestRecvIndexServer_.erase(processedList[i]); 334 requestRecvIndexGlobal_.erase(processedList[i]); 335 countBuffIndexGlobal.erase(processedList[i]); 336 countBuffIndexServer.erase(processedList[i]); 337 } 338 339 if (0 == recvNbClient) isFinished = true; 340 } 341 342 delete [] sendBuff; 343 delete [] sendNbIndexBuff; 344 delete [] recvBuff; 345 delete [] recvNbIndexBuff; 346 delete [] recvIndexGlobalBuff; 347 delete [] recvIndexServerBuff; 348 } 349 350 /*! 351 Probe and receive message containg global index from other clients. 352 Each client can send a message of global index to other clients to fulfill their maps. 353 Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer 354 \param [in] recvIndexGlobalBuff buffer dedicated for receiving global index 355 \param [in] recvNbIndexCount size of the buffer 356 */ 357 void CClientServerMappingDistributed::probeIndexGlobalMessageFromClients(unsigned long* recvIndexGlobalBuff, int recvNbIndexCount) 358 { 359 MPI_Status statusIndexGlobal; 360 int flagIndexGlobal, count; 361 362 // Probing for global index 363 MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX_0, clientIntraComm_, &flagIndexGlobal, &statusIndexGlobal); 364 if ((true == flagIndexGlobal) && (countIndexGlobal_ < recvNbIndexCount)) 365 { 366 MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count); 367 indexGlobalBuffBegin_.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexGlobalBuff+countIndexGlobal_)); 368 MPI_Irecv(recvIndexGlobalBuff+countIndexGlobal_, count, MPI_UNSIGNED_LONG, 369 statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX_0, clientIntraComm_, 370 &requestRecvIndexGlobal_[statusIndexGlobal.MPI_SOURCE]); 371 countIndexGlobal_ += count; 45 indexGlobalOnServer_[it->second].push_back(it->first); 372 46 } 373 47 } 374 48 375 /*!376 Probe and receive message containg server index from other clients.377 Each client can send a message of server index to other clients to fulfill their maps.378 Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer379 \param [in] recvIndexServerBuff buffer dedicated for receiving server index380 \param [in] recvNbIndexCount size of the buffer381 */382 void CClientServerMappingDistributed::probeIndexServerMessageFromClients(int* recvIndexServerBuff, int recvNbIndexCount)383 {384 MPI_Status statusIndexServer;385 int flagIndexServer, count;386 387 // Probing for server index388 MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO_0, clientIntraComm_, &flagIndexServer, &statusIndexServer);389 if ((true == flagIndexServer) && (countIndexServer_ < recvNbIndexCount))390 {391 MPI_Get_count(&statusIndexServer, MPI_INT, &count);392 indexServerBuffBegin_.insert(std::make_pair<int, int*>(statusIndexServer.MPI_SOURCE, recvIndexServerBuff+countIndexServer_));393 MPI_Irecv(recvIndexServerBuff+countIndexServer_, count, MPI_INT,394 statusIndexServer.MPI_SOURCE, MPI_DHT_INFO_0, clientIntraComm_,395 &requestRecvIndexServer_[statusIndexServer.MPI_SOURCE]);396 397 countIndexServer_ += count;398 }399 49 } 400 401 /*!402 Send message containing global index to clients403 \param [in] clientDestRank rank of destination client404 \param [in] indexGlobal global index to send405 \param [in] clientIntraComm communication group of client406 \param [in] requestSendIndexGlobal list of sending request407 */408 void CClientServerMappingDistributed::sendIndexGlobalToClients(int clientDestRank, std::vector<size_t>& indexGlobal,409 const MPI_Comm& clientIntraComm,410 std::list<MPI_Request>& requestSendIndexGlobal)411 {412 MPI_Request request;413 requestSendIndexGlobal.push_back(request);414 MPI_Isend(&(indexGlobal)[0], (indexGlobal).size(), MPI_UNSIGNED_LONG,415 clientDestRank, MPI_DHT_INDEX_0, clientIntraComm, &(requestSendIndexGlobal.back()));416 }417 418 /*!419 Send message containing server index to clients420 \param [in] clientDestRank rank of destination client421 \param [in] indexServer server index to send422 \param [in] clientIntraComm communication group of client423 \param [in] requestSendIndexServer list of sending request424 */425 void CClientServerMappingDistributed::sendIndexServerToClients(int clientDestRank, std::vector<int>& indexServer,426 const MPI_Comm& clientIntraComm,427 std::list<MPI_Request>& requestSendIndexServer)428 {429 MPI_Request request;430 requestSendIndexServer.push_back(request);431 MPI_Isend(&(indexServer)[0], (indexServer).size(), MPI_INT,432 clientDestRank, MPI_DHT_INFO_0, clientIntraComm, &(requestSendIndexServer.back()));433 }434 435 /*!436 Verify status of sending request437 \param [in] sendRequest sending request to verify438 */439 void CClientServerMappingDistributed::testSendRequest(std::list<MPI_Request>& sendRequest)440 {441 int flag = 0;442 MPI_Status status;443 std::list<MPI_Request>::iterator itRequest;444 int sizeListRequest = sendRequest.size();445 int idx = 0;446 while (idx < sizeListRequest)447 {448 bool isErased = false;449 for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)450 {451 MPI_Test(&(*itRequest), &flag, &status);452 if (true == flag)453 {454 isErased = true;455 break;456 }457 }458 if (true == isErased) sendRequest.erase(itRequest);459 ++idx;460 }461 }462 463 /*!464 Process the received request. Pushing global index and server index into map465 \param[in] buffIndexGlobal pointer to the begining of buffer containing global index466 \param[in] buffIndexServer pointer to the begining of buffer containing server index467 \param[in] count size of received message468 */469 void CClientServerMappingDistributed::processReceivedRequest(unsigned long* buffIndexGlobal, int* buffIndexServer, int count)470 {471 for (int i = 0; i < count; ++i)472 globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(*(buffIndexGlobal+i),*(buffIndexServer+i)));473 }474 475 /*!476 Compute size of message containing global index477 \param[in] requestRecv request of message478 */479 int CClientServerMappingDistributed::computeBuffCountIndexGlobal(MPI_Request& requestRecv)480 {481 int flag, count = 0;482 MPI_Status status;483 484 MPI_Test(&requestRecv, &flag, &status);485 if (true == flag)486 {487 MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);488 }489 490 return count;491 }492 493 /*!494 Compute size of message containing server index495 \param[in] requestRecv request of message496 */497 int CClientServerMappingDistributed::computeBuffCountIndexServer(MPI_Request& requestRecv)498 {499 int flag, count = 0;500 MPI_Status status;501 502 MPI_Test(&requestRecv, &flag, &status);503 if (true == flag)504 {505 MPI_Get_count(&status, MPI_INT, &count);506 }507 508 return count;509 }510 511 /*!512 Reset all receiving request map and counter513 */514 void CClientServerMappingDistributed::resetReceivingRequestAndCount()515 {516 countIndexGlobal_ = countIndexServer_ = 0;517 requestRecvIndexGlobal_.clear();518 requestRecvIndexServer_.clear();519 indexGlobalBuffBegin_.clear();520 indexServerBuffBegin_.clear();521 }522 523 }
Note: See TracChangeset
for help on using the changeset viewer.