Ignore:
Timestamp:
01/24/17 16:15:50 (4 years ago)
Author:
rlacroix
Message:

Make the XIOS server(s) completely non-blocking.

This fixes some deadlocks caused by bugs in the communication protocol when using inputs and multiple contexts.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/context_client.cpp

    r988 r1033  
    8585    { 
    8686      list<int> ranks = event.getRanks(); 
     87 
    8788      if (!event.isEmpty()) 
    8889      { 
    8990        list<int> sizes = event.getSizes(); 
    9091 
    91         list<CBufferOut*> buffList = getBuffers(ranks, sizes); 
    92  
    93         event.send(timeLine, sizes, buffList); 
    94  
    95         checkBuffers(ranks); 
    96       } 
    97  
    98       if (isAttachedModeEnabled()) 
    99       { 
    100         waitEvent(ranks); 
    101         CContext::setCurrent(context->getId()); 
     92        // We force the getBuffers call to be non-blocking on the servers 
     93        list<CBufferOut*> buffList; 
     94        bool couldBuffer = getBuffers(ranks, sizes, buffList, !CXios::isClient); 
     95 
     96        if (couldBuffer) 
     97        { 
     98          event.send(timeLine, sizes, buffList); 
     99 
     100          checkBuffers(ranks); 
     101 
     102          if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
     103          { 
     104            waitEvent(ranks); 
     105            CContext::setCurrent(context->getId()); 
     106          } 
     107        } 
     108        else 
     109        { 
     110          tmpBufferedEvent.ranks = ranks; 
     111          tmpBufferedEvent.sizes = sizes; 
     112 
     113          for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 
     114            tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 
     115 
     116          event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 
     117        } 
    102118      } 
    103119 
    104120      timeLine++; 
     121    } 
     122 
     123    /*! 
     124     * Send the temporarily buffered event (if any). 
     125     * 
     126     * \return true if a temporarily buffered event could be sent, false otherwise  
     127     */ 
     128    bool CContextClient::sendTemporarilyBufferedEvent() 
     129    { 
     130      bool couldSendTmpBufferedEvent = false; 
     131 
     132      if (hasTemporarilyBufferedEvent()) 
     133      { 
     134        list<CBufferOut*> buffList; 
     135        if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call 
     136        { 
     137          list<CBufferOut*>::iterator it, itBuffer; 
     138 
     139          for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++) 
     140            (*itBuffer)->put((char*)(*it)->start(), (*it)->count()); 
     141 
     142          checkBuffers(tmpBufferedEvent.ranks); 
     143 
     144          tmpBufferedEvent.clear(); 
     145 
     146          couldSendTmpBufferedEvent = true; 
     147        } 
     148      } 
     149 
     150      return couldSendTmpBufferedEvent; 
    105151    } 
    106152 
     
    126172 
    127173    /*! 
    128     Setup buffer for each connection to server and verify their state to put content into them 
    129     \param [in] serverList list of rank of connected server 
    130     \param [in] sizeList size of message corresponding to each connection 
    131     \return List of buffer input which event can be placed 
     174     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless 
     175     * it is explicitly requested to be non-blocking. 
     176     * 
     177     * \param [in] serverList list of rank of connected server 
     178     * \param [in] sizeList size of message corresponding to each connection 
     179     * \param [out] retBuffers list of buffers that can be used to store an event 
     180     * \param [in] nonBlocking whether this function should be non-blocking 
     181     * \return whether the already allocated buffers could be used 
    132182    */ 
    133     list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList) 
     183    bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking /*= false*/) 
    134184    { 
    135       list<int>::iterator itServer, itSize; 
     185      list<int>::const_iterator itServer, itSize; 
    136186      list<CClientBuffer*> bufferList; 
    137       map<int,CClientBuffer*>::iterator it; 
     187      map<int,CClientBuffer*>::const_iterator it; 
    138188      list<CClientBuffer*>::iterator itBuffer; 
    139       list<CBufferOut*>  retBuffer; 
    140189      bool areBuffersFree; 
    141190 
     
    163212          context->server->listen(); 
    164213        } 
    165       } while (!areBuffersFree); 
     214      } while (!areBuffersFree && !nonBlocking); 
    166215      CTimer::get("Blocking time").suspend(); 
    167216 
    168       for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
    169       { 
    170         retBuffer.push_back((*itBuffer)->getBuffer(*itSize)); 
    171       } 
    172       return retBuffer; 
     217      if (areBuffersFree) 
     218      { 
     219        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     220          retBuffers.push_back((*itBuffer)->getBuffer(*itSize)); 
     221      } 
     222 
     223      return areBuffersFree; 
    173224   } 
    174225 
     
    303354   { 
    304355     map<int,CClientBuffer*>::iterator itBuff; 
    305      bool stop = true; 
     356     bool stop = false; 
     357 
     358     CTimer::get("Blocking time").resume(); 
     359     while (hasTemporarilyBufferedEvent()) 
     360     { 
     361       checkBuffers(); 
     362       sendTemporarilyBufferedEvent(); 
     363     } 
     364     CTimer::get("Blocking time").suspend(); 
    306365 
    307366     CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 
     
    317376 
    318377     CTimer::get("Blocking time").resume(); 
    319      while (stop) 
     378     while (!stop) 
    320379     { 
    321380       checkBuffers(); 
    322        stop = false; 
    323        for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop |= itBuff->second->hasPendingRequest(); 
     381       if (hasTemporarilyBufferedEvent()) 
     382         sendTemporarilyBufferedEvent(); 
     383 
     384       stop = true; 
     385       for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 
    324386     } 
    325387     CTimer::get("Blocking time").suspend(); 
Note: See TracChangeset for help on using the changeset viewer.