Changeset 1033
- Timestamp:
- 01/24/17 16:15:50 (8 years ago)
- Location:
- XIOS/trunk/src
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/trunk/src/context_client.cpp
r988 r1033 85 85 { 86 86 list<int> ranks = event.getRanks(); 87 87 88 if (!event.isEmpty()) 88 89 { 89 90 list<int> sizes = event.getSizes(); 90 91 91 list<CBufferOut*> buffList = getBuffers(ranks, sizes); 92 93 event.send(timeLine, sizes, buffList); 94 95 checkBuffers(ranks); 96 } 97 98 if (isAttachedModeEnabled()) 99 { 100 waitEvent(ranks); 101 CContext::setCurrent(context->getId()); 92 // We force the getBuffers call to be non-blocking on the servers 93 list<CBufferOut*> buffList; 94 bool couldBuffer = getBuffers(ranks, sizes, buffList, !CXios::isClient); 95 96 if (couldBuffer) 97 { 98 event.send(timeLine, sizes, buffList); 99 100 checkBuffers(ranks); 101 102 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 103 { 104 waitEvent(ranks); 105 CContext::setCurrent(context->getId()); 106 } 107 } 108 else 109 { 110 tmpBufferedEvent.ranks = ranks; 111 tmpBufferedEvent.sizes = sizes; 112 113 for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 114 tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 115 116 event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 117 } 102 118 } 103 119 104 120 timeLine++; 121 } 122 123 /*! 124 * Send the temporarily buffered event (if any). 125 * 126 * \return true if a temporarily buffered event could be sent, false otherwise 127 */ 128 bool CContextClient::sendTemporarilyBufferedEvent() 129 { 130 bool couldSendTmpBufferedEvent = false; 131 132 if (hasTemporarilyBufferedEvent()) 133 { 134 list<CBufferOut*> buffList; 135 if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call 136 { 137 list<CBufferOut*>::iterator it, itBuffer; 138 139 for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++) 140 (*itBuffer)->put((char*)(*it)->start(), (*it)->count()); 141 142 checkBuffers(tmpBufferedEvent.ranks); 143 144 tmpBufferedEvent.clear(); 145 146 couldSendTmpBufferedEvent = true; 147 } 148 } 149 150 return couldSendTmpBufferedEvent; 105 151 } 106 152 … … 126 172 127 173 /*! 128 Setup buffer for each connection to server and verify their state to put content into them 129 \param [in] serverList list of rank of connected server 130 \param [in] sizeList size of message corresponding to each connection 131 \return List of buffer input which event can be placed 174 * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless 175 * it is explicitly requested to be non-blocking. 176 * 177 * \param [in] serverList list of rank of connected server 178 * \param [in] sizeList size of message corresponding to each connection 179 * \param [out] retBuffers list of buffers that can be used to store an event 180 * \param [in] nonBlocking whether this function should be non-blocking 181 * \return whether the already allocated buffers could be used 132 182 */ 133 list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList)183 bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking /*= false*/) 134 184 { 135 list<int>:: iterator itServer, itSize;185 list<int>::const_iterator itServer, itSize; 136 186 list<CClientBuffer*> bufferList; 137 map<int,CClientBuffer*>:: iterator it;187 map<int,CClientBuffer*>::const_iterator it; 138 188 list<CClientBuffer*>::iterator itBuffer; 139 list<CBufferOut*> retBuffer;140 189 bool areBuffersFree; 141 190 … … 163 212 context->server->listen(); 164 213 } 165 } while (!areBuffersFree );214 } while (!areBuffersFree && !nonBlocking); 166 215 CTimer::get("Blocking time").suspend(); 167 216 168 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 169 { 170 retBuffer.push_back((*itBuffer)->getBuffer(*itSize)); 171 } 172 return retBuffer; 217 if (areBuffersFree) 218 { 219 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 220 retBuffers.push_back((*itBuffer)->getBuffer(*itSize)); 221 } 222 223 return areBuffersFree; 173 224 } 174 225 … … 303 354 { 304 355 map<int,CClientBuffer*>::iterator itBuff; 305 bool stop = true; 356 bool stop = false; 357 358 CTimer::get("Blocking time").resume(); 359 while (hasTemporarilyBufferedEvent()) 360 { 361 checkBuffers(); 362 sendTemporarilyBufferedEvent(); 363 } 364 CTimer::get("Blocking time").suspend(); 306 365 307 366 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); … … 317 376 318 377 CTimer::get("Blocking time").resume(); 319 while ( stop)378 while (!stop) 320 379 { 321 380 checkBuffers(); 322 stop = false; 323 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop |= itBuff->second->hasPendingRequest(); 381 if (hasTemporarilyBufferedEvent()) 382 sendTemporarilyBufferedEvent(); 383 384 stop = true; 385 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 324 386 } 325 387 CTimer::get("Blocking time").suspend(); -
XIOS/trunk/src/context_client.hpp
r988 r1033 31 31 // Send event to server 32 32 void sendEvent(CEventClient& event); 33 bool sendTemporarilyBufferedEvent(); 33 34 void waitEvent(list<int>& ranks); 34 35 35 // Functions relatesto set/get buffers36 list<CBufferOut*> getBuffers(list<int>& serverlist, list<int>& sizeList);36 // Functions to set/get buffers 37 bool getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 37 38 void newBuffer(int rank); 38 39 bool checkBuffers(list<int>& ranks); … … 46 47 47 48 bool isAttachedModeEnabled() const; 49 50 bool hasTemporarilyBufferedEvent() const { return !tmpBufferedEvent.isEmpty(); }; 48 51 49 52 // Close and finalize context client … … 76 79 StdSize maxBufferedEvents; 77 80 81 struct { 82 std::list<int> ranks, sizes; 83 std::list<CBufferOut*> buffers; 84 85 bool isEmpty() const { return ranks.empty(); }; 86 void clear() { 87 ranks.clear(); 88 sizes.clear(); 89 90 for (std::list<CBufferOut*>::iterator it = buffers.begin(); it != buffers.end(); it++) 91 delete *it; 92 93 buffers.clear(); 94 }; 95 } tmpBufferedEvent; //! Event temporarily buffered (used only on the server) 96 78 97 //! Context for server (Only used in attached mode) 79 98 CContext* parentServer; -
XIOS/trunk/src/context_server.cpp
r998 r1033 57 57 } 58 58 59 bool CContextServer::eventLoop( void)59 bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/) 60 60 { 61 61 listen(); 62 62 checkPendingRequest(); 63 processEvents(); 63 if (enableEventsProcessing) 64 processEvents(); 64 65 return finished; 65 66 } -
XIOS/trunk/src/context_server.hpp
r697 r1033 15 15 16 16 CContextServer(CContext* parent,MPI_Comm intraComm,MPI_Comm interComm) ; 17 bool eventLoop( void);17 bool eventLoop(bool enableEventsProcessing = true); 18 18 void listen(void) ; 19 19 void checkPendingRequest(void) ; -
XIOS/trunk/src/node/context.cpp
r1028 r1033 339 339 } 340 340 341 //! Server side: Put server into a loop in order to listen message from client342 bool CContext::eventLoop(void)343 {344 return server->eventLoop();345 }346 347 341 //! Try to send the buffers and receive possible answers 348 342 bool CContext::checkBuffersAndListen(void) 349 343 { 350 344 client->checkBuffers(); 351 return server->eventLoop(); 345 346 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 347 if (hasTmpBufferedEvent) 348 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 349 350 // Don't process events if there is a temporarily buffered event 351 return server->eventLoop(!hasTmpBufferedEvent); 352 352 } 353 353 -
XIOS/trunk/src/node/context.hpp
r917 r1033 93 93 94 94 // Put sever or client into loop state 95 bool eventLoop(void);96 97 95 bool checkBuffersAndListen(void); 98 96
Note: See TracChangeset
for help on using the changeset viewer.