Changeset 1460 for XIOS/dev/branch_openmp/src/context_client.cpp
- Timestamp:
- 03/22/18 10:43:20 (6 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_openmp/src/context_client.cpp
r1356 r1460 11 11 #include "timer.hpp" 12 12 #include "cxios.hpp" 13 #include "server.hpp" 13 14 using namespace ep_lib; 14 15 … … 19 20 \param [in] intraComm_ communicator of group client 20 21 \param [in] interComm_ communicator of group server 21 \cxtSer [in] cxtSer Pointer to context of server side. (It is only used on case of attached mode)22 \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode). 22 23 */ 23 24 CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) … … 40 41 else MPI_Comm_size(interComm, &serverSize); 41 42 43 computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 44 45 timeLine = 0; 46 } 47 48 void CContextClient::computeLeader(int clientRank, int clientSize, int serverSize, 49 std::list<int>& rankRecvLeader, 50 std::list<int>& rankRecvNotLeader) 51 { 52 if ((0 == clientSize) || (0 == serverSize)) return; 53 42 54 if (clientSize < serverSize) 43 55 { … … 55 67 56 68 for (int i = 0; i < serverByClient; i++) 57 rank sServerLeader.push_back(rankStart + i);58 59 rank sServerNotLeader.resize(0);69 rankRecvLeader.push_back(rankStart + i); 70 71 rankRecvNotLeader.resize(0); 60 72 } 61 73 else … … 67 79 { 68 80 if (clientRank % (clientByServer + 1) == 0) 69 rank sServerLeader.push_back(clientRank / (clientByServer + 1));81 rankRecvLeader.push_back(clientRank / (clientByServer + 1)); 70 82 else 71 rank sServerNotLeader.push_back(clientRank / (clientByServer + 1));83 rankRecvNotLeader.push_back(clientRank / (clientByServer + 1)); 72 84 } 73 85 else … … 75 87 int rank = clientRank - (clientByServer + 1) * remain; 76 88 if (rank % clientByServer == 0) 77 rank sServerLeader.push_back(remain + rank / clientByServer);89 rankRecvLeader.push_back(remain + rank / clientByServer); 78 90 else 79 ranksServerNotLeader.push_back(remain + rank / clientByServer); 80 } 81 } 82 83 timeLine = 0; 91 rankRecvNotLeader.push_back(remain + rank / clientByServer); 92 } 93 } 84 94 } 85 95 … … 92 102 list<int> ranks = event.getRanks(); 93 103 104 if (CXios::checkEventSync) 105 { 106 int typeId, classId, typeId_in, classId_in, timeLine_out; 107 typeId_in=event.getTypeId() ; 108 classId_in=event.getClassId() ; 109 MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; 110 MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ; 111 MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ; 112 if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine) 113 { 114 ERROR("void CContextClient::sendEvent(CEventClient& event)", 115 << "Event are not coherent between client."); 116 } 117 } 118 94 119 if (!event.isEmpty()) 95 120 { 96 121 list<int> sizes = event.getSizes(); 97 122 98 // We force the getBuffers call to be non-blocking on theservers123 // We force the getBuffers call to be non-blocking on classical servers 99 124 list<CBufferOut*> buffList; 100 bool couldBuffer = getBuffers(ranks, sizes, buffList, !CXios::isClient); 125 bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) )); 126 // bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer ); 101 127 102 128 if (couldBuffer) … … 119 145 for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 120 146 tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 121 147 info(100)<<"DEBUG : temporaly event created : timeline "<<timeLine<<endl ; 122 148 event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 123 149 } … … 146 172 (*itBuffer)->put((char*)(*it)->start(), (*it)->count()); 147 173 174 info(100)<<"DEBUG : temporaly event sent "<<endl ; 148 175 checkBuffers(tmpBufferedEvent.ranks); 149 176 … … 187 214 * \return whether the already allocated buffers could be used 188 215 */ 189 bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking /*= false*/) 216 bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 217 bool nonBlocking /*= false*/) 190 218 { 191 219 list<int>::const_iterator itServer, itSize; … … 216 244 { 217 245 checkBuffers(); 218 context->server->listen(); 246 if (CServer::serverLevel == 0) 247 context->server->listen(); 248 249 else if (CServer::serverLevel == 1) 250 { 251 context->server->listen(); 252 for (int i = 0; i < context->serverPrimServer.size(); ++i) 253 context->serverPrimServer[i]->listen(); 254 CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 255 } 256 257 else if (CServer::serverLevel == 2) 258 context->server->listen(); 259 219 260 } 220 261 } while (!areBuffersFree && !nonBlocking); 262 221 263 CTimer::get("Blocking time").suspend(); 222 264 … … 257 299 map<int,CClientBuffer*>::iterator itBuff; 258 300 bool pending = false; 259 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) pending |= itBuff->second->checkBuffer(); 301 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 302 pending |= itBuff->second->checkBuffer(); 260 303 return pending; 261 304 } 262 305 263 306 //! Release all buffers 264 void CContextClient::releaseBuffers( void)307 void CContextClient::releaseBuffers() 265 308 { 266 309 map<int,CClientBuffer*>::iterator itBuff; 267 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) delete itBuff->second; 310 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 311 { 312 delete itBuff->second; 313 } 314 buffers.clear(); 268 315 } 269 316 … … 362 409 363 410 /*! 364 Finalize context client and do some reports 365 */ 366 void CContextClient::finalize(void) 367 { 368 map<int,CClientBuffer*>::iterator itBuff; 369 bool stop = false; 370 371 CTimer::get("Blocking time").resume(); 372 while (hasTemporarilyBufferedEvent()) 373 { 374 checkBuffers(); 375 sendTemporarilyBufferedEvent(); 376 } 377 CTimer::get("Blocking time").suspend(); 378 379 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 380 if (isServerLeader()) 381 { 382 CMessage msg; 383 const std::list<int>& ranks = getRanksServerLeader(); 384 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 385 event.push(*itRank, 1, msg); 386 sendEvent(event); 387 } 388 else sendEvent(event); 389 390 CTimer::get("Blocking time").resume(); 391 while (!stop) 392 { 393 checkBuffers(); 394 if (hasTemporarilyBufferedEvent()) 395 sendTemporarilyBufferedEvent(); 396 397 stop = true; 398 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 399 } 400 CTimer::get("Blocking time").suspend(); 401 402 std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 403 iteMap = mapBufferSize_.end(), itMap; 404 StdSize totalBuf = 0; 405 for (itMap = itbMap; itMap != iteMap; ++itMap) 406 { 407 //report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 408 // << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 409 totalBuf += itMap->second; 410 } 411 //report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 412 413 releaseBuffers(); 414 } 411 * Finalize context client and do some reports. Function is non-blocking. 412 */ 413 void CContextClient::finalize(void) 414 { 415 map<int,CClientBuffer*>::iterator itBuff; 416 bool stop = false; 417 418 CTimer::get("Blocking time").resume(); 419 while (hasTemporarilyBufferedEvent()) 420 { 421 checkBuffers(); 422 sendTemporarilyBufferedEvent(); 423 } 424 CTimer::get("Blocking time").suspend(); 425 426 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 427 if (isServerLeader()) 428 { 429 CMessage msg; 430 const std::list<int>& ranks = getRanksServerLeader(); 431 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 432 { 433 #pragma omp critical (_output) 434 info(100)<<"DEBUG : Sent context Finalize event to rank "<<*itRank<<endl ; 435 event.push(*itRank, 1, msg); 436 } 437 sendEvent(event); 438 } 439 else sendEvent(event); 440 441 CTimer::get("Blocking time").resume(); 442 // while (!stop) 443 { 444 checkBuffers(); 445 if (hasTemporarilyBufferedEvent()) 446 sendTemporarilyBufferedEvent(); 447 448 stop = true; 449 // for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 450 } 451 CTimer::get("Blocking time").suspend(); 452 453 std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 454 iteMap = mapBufferSize_.end(), itMap; 455 456 StdSize totalBuf = 0; 457 for (itMap = itbMap; itMap != iteMap; ++itMap) 458 { 459 #pragma omp critical (_output) 460 report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 461 << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 462 totalBuf += itMap->second; 463 } 464 #pragma omp critical (_output) 465 report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 466 467 //releaseBuffers(); // moved to CContext::finalize() 468 } 469 470 471 /*! 472 */ 473 bool CContextClient::havePendingRequests(void) 474 { 475 bool pending = false; 476 map<int,CClientBuffer*>::iterator itBuff; 477 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 478 pending |= itBuff->second->hasPendingRequest(); 479 return pending; 480 } 481 482 415 483 }
Note: See TracChangeset
for help on using the changeset viewer.