Changeset 1054 for XIOS/dev/dev_olga/src/context_client.cpp
- Timestamp:
- 02/17/17 19:51:36 (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_olga/src/context_client.cpp
r1021 r1054 84 84 { 85 85 list<int> ranks = event.getRanks(); 86 86 87 if (!event.isEmpty()) 87 88 { 88 89 list<int> sizes = event.getSizes(); 89 90 90 list<CBufferOut*> buffList = getBuffers(ranks, sizes); 91 92 event.send(timeLine, sizes, buffList); 93 94 checkBuffers(ranks); 95 } 96 97 if (isAttachedModeEnabled()) 98 { 99 waitEvent(ranks); 100 CContext::setCurrent(context->getId()); 91 // We force the getBuffers call to be non-blocking on the servers 92 list<CBufferOut*> buffList; 93 bool couldBuffer = getBuffers(ranks, sizes, buffList, !CXios::isClient); 94 // bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer); 95 96 if (couldBuffer) 97 { 98 event.send(timeLine, sizes, buffList); 99 100 checkBuffers(ranks); 101 102 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 103 { 104 waitEvent(ranks); 105 CContext::setCurrent(context->getId()); 106 } 107 } 108 else 109 { 110 tmpBufferedEvent.ranks = ranks; 111 tmpBufferedEvent.sizes = sizes; 112 113 for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 114 tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 115 116 event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 117 } 101 118 } 102 119 103 120 timeLine++; 121 } 122 123 /*! 124 * Send the temporarily buffered event (if any). 125 * 126 * \return true if a temporarily buffered event could be sent, false otherwise 127 */ 128 bool CContextClient::sendTemporarilyBufferedEvent() 129 { 130 bool couldSendTmpBufferedEvent = false; 131 132 if (hasTemporarilyBufferedEvent()) 133 { 134 list<CBufferOut*> buffList; 135 if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call 136 { 137 list<CBufferOut*>::iterator it, itBuffer; 138 139 for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++) 140 (*itBuffer)->put((char*)(*it)->start(), (*it)->count()); 141 142 checkBuffers(tmpBufferedEvent.ranks); 143 144 tmpBufferedEvent.clear(); 145 146 couldSendTmpBufferedEvent = true; 147 } 148 } 149 150 return couldSendTmpBufferedEvent; 104 151 } 105 152 … … 124 171 } 125 172 126 /*! 127 Setup buffer for each connection to server and verify their state to put content into them 128 \param [in] serverList list of rank of connected server 129 \param [in] sizeList size of message corresponding to each connection 130 \return List of buffer input which event can be placed 131 */ 132 list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList) 133 { 134 list<int>::iterator itServer, itSize; 173 174 /*! 175 * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless 176 * it is explicitly requested to be non-blocking. 177 * 178 * \param [in] serverList list of rank of connected server 179 * \param [in] sizeList size of message corresponding to each connection 180 * \param [out] retBuffers list of buffers that can be used to store an event 181 * \param [in] nonBlocking whether this function should be non-blocking 182 * \return whether the already allocated buffers could be used 183 */ 184 bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking /*= false*/) 185 { 186 list<int>::const_iterator itServer, itSize; 135 187 list<CClientBuffer*> bufferList; 136 map<int,CClientBuffer*>:: iterator it;188 map<int,CClientBuffer*>::const_iterator it; 137 189 list<CClientBuffer*>::iterator itBuffer; 138 list<CBufferOut*> retBuffer;139 190 bool areBuffersFree; 140 191 … … 160 211 { 161 212 checkBuffers(); 162 context->server->listen(); 163 } 164 } while (!areBuffersFree); 213 // if (?) 214 // { 215 // for (int i = 0; i < context->serverPrimServer.size(); ++i) 216 // context->serverPrimServer[i]->listen(); 217 // } 218 // else 219 context->server->listen(); 220 } 221 } while (!areBuffersFree && !nonBlocking); 165 222 CTimer::get("Blocking time").suspend(); 166 223 167 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 168 { 169 retBuffer.push_back((*itBuffer)->getBuffer(*itSize)); 170 } 171 return retBuffer; 224 if (areBuffersFree) 225 { 226 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 227 retBuffers.push_back((*itBuffer)->getBuffer(*itSize)); 228 } 229 230 return areBuffersFree; 172 231 } 173 232 … … 299 358 Finalize context client and do some reports 300 359 */ 301 void CContextClient::finalize(void) 302 { 303 map<int,CClientBuffer*>::iterator itBuff; 304 bool stop = true; 305 306 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 307 if (isServerLeader()) 308 { 309 CMessage msg; 310 const std::list<int>& ranks = getRanksServerLeader(); 311 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 312 event.push(*itRank, 1, msg); 313 sendEvent(event); 314 } 315 else sendEvent(event); 316 317 CTimer::get("Blocking time").resume(); 318 while (stop) 319 { 320 checkBuffers(); 321 stop = false; 322 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop |= itBuff->second->hasPendingRequest(); 323 } 324 CTimer::get("Blocking time").suspend(); 325 326 std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 327 iteMap = mapBufferSize_.end(), itMap; 328 StdSize totalBuf = 0; 329 for (itMap = itbMap; itMap != iteMap; ++itMap) 330 { 331 report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 332 << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 333 totalBuf += itMap->second; 334 } 335 report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 336 337 releaseBuffers(); 338 } 360 void CContextClient::finalize(void) 361 { 362 map<int,CClientBuffer*>::iterator itBuff; 363 bool stop = false; 364 365 CTimer::get("Blocking time").resume(); 366 while (hasTemporarilyBufferedEvent()) 367 { 368 checkBuffers(); 369 sendTemporarilyBufferedEvent(); 370 } 371 CTimer::get("Blocking time").suspend(); 372 373 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 374 if (isServerLeader()) 375 { 376 CMessage msg; 377 const std::list<int>& ranks = getRanksServerLeader(); 378 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 379 event.push(*itRank, 1, msg); 380 sendEvent(event); 381 } 382 else sendEvent(event); 383 384 CTimer::get("Blocking time").resume(); 385 while (!stop) 386 { 387 checkBuffers(); 388 if (hasTemporarilyBufferedEvent()) 389 sendTemporarilyBufferedEvent(); 390 391 stop = true; 392 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 393 } 394 CTimer::get("Blocking time").suspend(); 395 396 std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 397 iteMap = mapBufferSize_.end(), itMap; 398 StdSize totalBuf = 0; 399 for (itMap = itbMap; itMap != iteMap; ++itMap) 400 { 401 report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 402 << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 403 totalBuf += itMap->second; 404 } 405 report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 406 407 releaseBuffers(); 408 } 339 409 }
Note: See TracChangeset
for help on using the changeset viewer.