Changeset 569 for XIOS/trunk/src/client_server_mapping_distributed.cpp
- Timestamp:
- 03/10/15 10:49:13 (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/trunk/src/client_server_mapping_distributed.cpp
r568 r569 1 /*! 2 \file client_server_mapping.hpp 3 \author Ha NGUYEN 4 \since 27 Feb 2015 5 \date 09 Mars 2015 6 7 \brief Mapping between index client and server. 8 Clients pre-calculate all information of server distribution. 9 */ 1 10 #include "client_server_mapping_distributed.hpp" 2 11 #include <limits> … … 7 16 8 17 CClientServerMappingDistributed::CClientServerMappingDistributed(const boost::unordered_map<size_t,int>& globalIndexOfServer, 9 const MPI_Comm& clientIntraComm) : CClientServerMapping(), indexClientHash_() 18 const MPI_Comm& clientIntraComm) 19 : CClientServerMapping(), indexClientHash_(), countIndexGlobal_(0), countIndexServer_(0), 20 indexGlobalBuffBegin_(), indexServerBuffBegin_(), requestRecvIndexServer_() 10 21 { 11 22 clientIntraComm_ = clientIntraComm; 12 23 MPI_Comm_size(clientIntraComm,&(nbClient_)); 13 MPI_Comm_rank(clientIntraComm,&clientRank_) ; 24 MPI_Comm_rank(clientIntraComm,&clientRank_); 25 computeHashIndex(); 14 26 computeDistributedServerIndex(globalIndexOfServer, clientIntraComm); 15 27 } … … 17 29 CClientServerMappingDistributed::~CClientServerMappingDistributed() 18 30 { 19 20 } 21 31 } 32 33 /*! 34 Compute mapping global index of server which client sends to. 35 \param [in] globalIndexOnClient global index client has 36 */ 37 void CClientServerMappingDistributed::computeServerIndexMapping(const CArray<size_t,1>& globalIndexOnClient) 38 { 39 int ssize = globalIndexOnClient.numElements(); 40 CArray<int,1>* localIndexOnClient = new CArray<int,1>(ssize); 41 for (int i = 0; i < ssize; ++i) (*localIndexOnClient)(i) = i; 42 43 this->computeServerIndexMapping(globalIndexOnClient, *localIndexOnClient); 44 delete localIndexOnClient; 45 } 46 47 /*! 48 Compute mapping global index of server which client sends to. 49 \param [in] globalIndexOnClient global index client has 50 \param [in] localIndexOnClient local index on client 51 */ 22 52 void CClientServerMappingDistributed::computeServerIndexMapping(const CArray<size_t,1>& globalIndexOnClient, 23 53 const CArray<int,1>& localIndexOnClient) … … 89 119 recvBuffIndexServer = new int[nbIndexReceivedFromOthers]; 90 120 91 resetRe questAndCount();121 resetReceivingRequestAndCount(); 92 122 std::map<int, MPI_Request>::iterator itRequest; 93 123 std::vector<int> demandAlreadyReceived, repondAlreadyReceived; … … 168 198 } 169 199 200 /*! 201 Compute the hash index distribution of whole size_t space then each client will have a range of this distribution 202 */ 170 203 void CClientServerMappingDistributed::computeHashIndex() 171 204 { … … 184 217 } 185 218 219 /*! 220 Compute distribution of global index for servers 221 Each client already holds a piece of information about global index and the corresponding server. 222 This information is redistributed into size_t sipace in which each client possesses a specific range of index. 223 Afterh the redistribution, each client as long as its range of index contains all necessary information about server. 224 \param [in] globalIndexOfServer global index and the corresponding server 225 \param [in] clientIntraComm client joining distribution process. 226 */ 186 227 void CClientServerMappingDistributed::computeDistributedServerIndex(const boost::unordered_map<size_t,int>& globalIndexOfServer, 187 228 const MPI_Comm& clientIntraComm) 188 229 { 189 computeHashIndex();190 int clientRank;191 MPI_Comm_rank(clientIntraComm,&clientRank);192 193 230 int* sendBuff = new int[nbClient_]; 194 231 int* sendNbIndexBuff = new int[nbClient_]; … … 214 251 { 215 252 int indexClient = std::distance(itbClientHash, itClientHash)-1; 216 if (clientRank == indexClient)253 if (clientRank_ == indexClient) 217 254 { 218 255 globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(it->first, it->second)); … … 228 265 } 229 266 230 231 // for (boost::unordered_map<size_t,int>::const_iterator it = globalIndexToServerMapping_.begin(); 232 // it != globalIndexToServerMapping_.end(); ++it) 233 // std::cout << " " << it->first << ":" << it->second; 234 // std::cout << "First Number = " << globalIndexToServerMapping_.size() << std::endl; 235 236 267 // Calculate from how many clients each client receive message. 237 268 int* recvBuff = new int[nbClient_]; 238 269 MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm); 239 270 int recvNbClient = recvBuff[clientRank_]; 271 272 // Calculate size of buffer for receiving message 240 273 int* recvNbIndexBuff = new int[nbClient_]; 241 274 MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm); 242 243 MPI_Status statusIndexGlobal, statusIndexServer; 244 int flag, countIndexGlobal_ = 0, countIndexServer_ = 0; 245 246 247 std::map<int, MPI_Request>::iterator itRequestIndexGlobal, itRequestIndexServer; 248 std::map<int, int> countBuffIndexServer, countBuffIndexGlobal; 249 std::vector<int> processedList; 250 251 252 bool isFinished=false; 253 int recvNbIndexCount = recvNbIndexBuff[clientRank]; 254 int recvNbClient = recvBuff[clientRank]; 275 int recvNbIndexCount = recvNbIndexBuff[clientRank_]; 255 276 unsigned long* recvIndexGlobalBuff = new unsigned long[recvNbIndexCount]; 256 277 int* recvIndexServerBuff = new int[recvNbIndexCount]; 257 278 279 // If a client holds information about global index and servers which don't belong to it, 280 // it will send a message to the correct clients. 281 // Contents of the message are global index and its corresponding server index 258 282 std::list<MPI_Request> sendRequest; 259 283 std::map<int, std::vector<size_t> >::iterator itGlobal = client2ClientIndexGlobal.begin(), 260 284 iteGlobal = client2ClientIndexGlobal.end(); 261 for ( ; 285 for ( ;itGlobal != iteGlobal; ++itGlobal) 262 286 sendIndexGlobalToClients(itGlobal->first, itGlobal->second, clientIntraComm, sendRequest); 263 287 std::map<int, std::vector<int> >::iterator itServer = client2ClientIndexServer.begin(), … … 266 290 sendIndexServerToClients(itServer->first, itServer->second, clientIntraComm, sendRequest); 267 291 268 resetRequestAndCount(); 292 std::map<int, MPI_Request>::iterator itRequestIndexGlobal, itRequestIndexServer; 293 std::map<int, int> countBuffIndexServer, countBuffIndexGlobal; 294 std::vector<int> processedList; 295 296 bool isFinished = (0 == recvNbClient) ? true : false; 297 298 // Just to make sure before listening message, all counting index and receiving request have already beeen reset 299 resetReceivingRequestAndCount(); 300 301 // Now each client trys to listen to demand from others. 302 // If they have message, it processes: pushing global index and corresponding server to its map 269 303 while (!isFinished || (!sendRequest.empty())) 270 304 { … … 304 338 --recvNbClient; 305 339 } 306 307 340 } 308 341 … … 324 357 delete [] recvIndexGlobalBuff; 325 358 delete [] recvIndexServerBuff; 326 327 // for (boost::unordered_map<size_t,int>::const_iterator it = globalIndexToServerMapping_.begin(); 328 // it != globalIndexToServerMapping_.end(); ++it) 329 // std::cout << " " << it->first << ":" << it->second; 330 // std::cout << "Number = " << globalIndexToServerMapping_.size() << std::endl; 331 332 } 333 359 } 360 361 /*! 362 Probe and receive message containg global index from other clients. 363 Each client can send a message of global index to other clients to fulfill their maps. 364 Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer 365 \param [in] recvIndexGlobalBuff buffer dedicated for receiving global index 366 \param [in] recvNbIndexCount size of the buffer 367 */ 334 368 void CClientServerMappingDistributed::probeIndexGlobalMessageFromClients(unsigned long* recvIndexGlobalBuff, int recvNbIndexCount) 335 369 { … … 350 384 } 351 385 386 /*! 387 Probe and receive message containg server index from other clients. 388 Each client can send a message of server index to other clients to fulfill their maps. 389 Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer 390 \param [in] recvIndexServerBuff buffer dedicated for receiving server index 391 \param [in] recvNbIndexCount size of the buffer 392 */ 352 393 void CClientServerMappingDistributed::probeIndexServerMessageFromClients(int* recvIndexServerBuff, int recvNbIndexCount) 353 394 { … … 369 410 } 370 411 371 412 /*! 413 Send message containing global index to clients 414 \param [in] clientDestRank rank of destination client 415 \param [in] indexGlobal global index to send 416 \param [in] clientIntraComm communication group of client 417 \param [in] requestSendIndexGlobal list of sending request 418 */ 372 419 void CClientServerMappingDistributed::sendIndexGlobalToClients(int clientDestRank, std::vector<size_t>& indexGlobal, 373 420 const MPI_Comm& clientIntraComm, … … 378 425 MPI_Isend(&(indexGlobal)[0], (indexGlobal).size(), MPI_UNSIGNED_LONG, 379 426 clientDestRank, 15, clientIntraComm, &(requestSendIndexGlobal.back())); 380 381 // int nbSendClient = indexGlobal.size(); 382 // std::map<int, std::vector<size_t> >::iterator 383 // itClient2ClientIndexGlobal = indexGlobal.begin(), 384 // iteClient2ClientIndexGlobal = indexGlobal.end(); 385 // 386 // for (; itClient2ClientIndexGlobal != iteClient2ClientIndexGlobal; 387 // ++itClient2ClientIndexGlobal) 388 // { 389 // MPI_Request request; 390 // requestSendIndexGlobal.push_back(request); 391 // MPI_Isend(&(itClient2ClientIndexGlobal->second)[0], 392 // (itClient2ClientIndexGlobal->second).size(), 393 // MPI_UNSIGNED_LONG, 394 // itClient2ClientIndexGlobal->first, 395 // 15, clientIntraComm, &(requestSendIndexGlobal.back())); 396 // } 397 398 } 399 427 } 428 429 /*! 430 Send message containing server index to clients 431 \param [in] clientDestRank rank of destination client 432 \param [in] indexServer server index to send 433 \param [in] clientIntraComm communication group of client 434 \param [in] requestSendIndexServer list of sending request 435 */ 400 436 void CClientServerMappingDistributed::sendIndexServerToClients(int clientDestRank, std::vector<int>& indexServer, 401 437 const MPI_Comm& clientIntraComm, … … 406 442 MPI_Isend(&(indexServer)[0], (indexServer).size(), MPI_INT, 407 443 clientDestRank, 12, clientIntraComm, &(requestSendIndexServer.back())); 408 409 // int nbSendClient = indexServer.size(); 410 // std::map<int, std::vector<int> >::iterator 411 // itClient2ClientIndexServer = indexServer.begin(), 412 // iteClient2ClientIndexServer = indexServer.end(); 413 414 // for (; itClient2ClientIndexServer != iteClient2ClientIndexServer; 415 // ++itClient2ClientIndexServer) 416 // { 417 // MPI_Request request; 418 // requestSendIndexServer.push_back(request); 419 // MPI_Isend(&(itClient2ClientIndexServer->second)[0], 420 // (itClient2ClientIndexServer->second).size(), 421 // MPI_INT, 422 // itClient2ClientIndexServer->first, 423 // 12, clientIntraComm, &(requestSendIndexServer.back())); 424 // 425 // } 426 } 427 444 } 445 446 /*! 447 Verify status of sending request 448 \param [in] sendRequest sending request to verify 449 */ 428 450 void CClientServerMappingDistributed::testSendRequest(std::list<MPI_Request>& sendRequest) 429 451 { … … 441 463 if (true == flag) 442 464 { 443 --sizeListRequest;444 465 isErased = true; 445 466 break; … … 451 472 } 452 473 474 /*! 475 Process the received request. Pushing global index and server index into map 476 \param[in] buffIndexGlobal pointer to the begining of buffer containing global index 477 \param[in] buffIndexServer pointer to the begining of buffer containing server index 478 \param[in] count size of received message 479 */ 453 480 void CClientServerMappingDistributed::processReceivedRequest(unsigned long* buffIndexGlobal, int* buffIndexServer, int count) 454 481 { … … 457 484 } 458 485 486 /*! 487 Compute size of message containing global index 488 \param[in] requestRecv request of message 489 */ 459 490 int CClientServerMappingDistributed::computeBuffCountIndexGlobal(MPI_Request& requestRecv) 460 491 { … … 471 502 } 472 503 504 /*! 505 Compute size of message containing server index 506 \param[in] requestRecv request of message 507 */ 473 508 int CClientServerMappingDistributed::computeBuffCountIndexServer(MPI_Request& requestRecv) 474 509 { … … 485 520 } 486 521 487 void CClientServerMappingDistributed::resetRequestAndCount() 522 /*! 523 Reset all receiving request map and counter 524 */ 525 void CClientServerMappingDistributed::resetReceivingRequestAndCount() 488 526 { 489 527 countIndexGlobal_ = countIndexServer_ = 0;
Note: See TracChangeset
for help on using the changeset viewer.