Changeset 1033 for XIOS/trunk


Ignore:
Timestamp:
01/24/17 16:15:50 (7 years ago)
Author:
rlacroix
Message:

Make the XIOS server(s) completely non-blocking.

This fixes some deadlocks caused by bugs in the communication protocol when using inputs and multiple contexts.

Location:
XIOS/trunk/src
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/context_client.cpp

    r988 r1033  
    8585    { 
    8686      list<int> ranks = event.getRanks(); 
     87 
    8788      if (!event.isEmpty()) 
    8889      { 
    8990        list<int> sizes = event.getSizes(); 
    9091 
    91         list<CBufferOut*> buffList = getBuffers(ranks, sizes); 
    92  
    93         event.send(timeLine, sizes, buffList); 
    94  
    95         checkBuffers(ranks); 
    96       } 
    97  
    98       if (isAttachedModeEnabled()) 
    99       { 
    100         waitEvent(ranks); 
    101         CContext::setCurrent(context->getId()); 
     92        // We force the getBuffers call to be non-blocking on the servers 
     93        list<CBufferOut*> buffList; 
     94        bool couldBuffer = getBuffers(ranks, sizes, buffList, !CXios::isClient); 
     95 
     96        if (couldBuffer) 
     97        { 
     98          event.send(timeLine, sizes, buffList); 
     99 
     100          checkBuffers(ranks); 
     101 
     102          if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
     103          { 
     104            waitEvent(ranks); 
     105            CContext::setCurrent(context->getId()); 
     106          } 
     107        } 
     108        else 
     109        { 
     110          tmpBufferedEvent.ranks = ranks; 
     111          tmpBufferedEvent.sizes = sizes; 
     112 
     113          for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 
     114            tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 
     115 
     116          event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 
     117        } 
    102118      } 
    103119 
    104120      timeLine++; 
     121    } 
     122 
     123    /*! 
     124     * Send the temporarily buffered event (if any). 
     125     * 
     126     * \return true if a temporarily buffered event could be sent, false otherwise  
     127     */ 
     128    bool CContextClient::sendTemporarilyBufferedEvent() 
     129    { 
     130      bool couldSendTmpBufferedEvent = false; 
     131 
     132      if (hasTemporarilyBufferedEvent()) 
     133      { 
     134        list<CBufferOut*> buffList; 
     135        if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call 
     136        { 
     137          list<CBufferOut*>::iterator it, itBuffer; 
     138 
     139          for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++) 
     140            (*itBuffer)->put((char*)(*it)->start(), (*it)->count()); 
     141 
     142          checkBuffers(tmpBufferedEvent.ranks); 
     143 
     144          tmpBufferedEvent.clear(); 
     145 
     146          couldSendTmpBufferedEvent = true; 
     147        } 
     148      } 
     149 
     150      return couldSendTmpBufferedEvent; 
    105151    } 
    106152 
     
    126172 
    127173    /*! 
    128     Setup buffer for each connection to server and verify their state to put content into them 
    129     \param [in] serverList list of rank of connected server 
    130     \param [in] sizeList size of message corresponding to each connection 
    131     \return List of buffer input which event can be placed 
     174     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless 
     175     * it is explicitly requested to be non-blocking. 
     176     * 
     177     * \param [in] serverList list of rank of connected server 
     178     * \param [in] sizeList size of message corresponding to each connection 
     179     * \param [out] retBuffers list of buffers that can be used to store an event 
     180     * \param [in] nonBlocking whether this function should be non-blocking 
     181     * \return whether the already allocated buffers could be used 
    132182    */ 
    133     list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList) 
     183    bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking /*= false*/) 
    134184    { 
    135       list<int>::iterator itServer, itSize; 
     185      list<int>::const_iterator itServer, itSize; 
    136186      list<CClientBuffer*> bufferList; 
    137       map<int,CClientBuffer*>::iterator it; 
     187      map<int,CClientBuffer*>::const_iterator it; 
    138188      list<CClientBuffer*>::iterator itBuffer; 
    139       list<CBufferOut*>  retBuffer; 
    140189      bool areBuffersFree; 
    141190 
     
    163212          context->server->listen(); 
    164213        } 
    165       } while (!areBuffersFree); 
     214      } while (!areBuffersFree && !nonBlocking); 
    166215      CTimer::get("Blocking time").suspend(); 
    167216 
    168       for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
    169       { 
    170         retBuffer.push_back((*itBuffer)->getBuffer(*itSize)); 
    171       } 
    172       return retBuffer; 
     217      if (areBuffersFree) 
     218      { 
     219        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     220          retBuffers.push_back((*itBuffer)->getBuffer(*itSize)); 
     221      } 
     222 
     223      return areBuffersFree; 
    173224   } 
    174225 
     
    303354   { 
    304355     map<int,CClientBuffer*>::iterator itBuff; 
    305      bool stop = true; 
     356     bool stop = false; 
     357 
     358     CTimer::get("Blocking time").resume(); 
     359     while (hasTemporarilyBufferedEvent()) 
     360     { 
     361       checkBuffers(); 
     362       sendTemporarilyBufferedEvent(); 
     363     } 
     364     CTimer::get("Blocking time").suspend(); 
    306365 
    307366     CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 
     
    317376 
    318377     CTimer::get("Blocking time").resume(); 
    319      while (stop) 
     378     while (!stop) 
    320379     { 
    321380       checkBuffers(); 
    322        stop = false; 
    323        for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop |= itBuff->second->hasPendingRequest(); 
     381       if (hasTemporarilyBufferedEvent()) 
     382         sendTemporarilyBufferedEvent(); 
     383 
     384       stop = true; 
     385       for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 
    324386     } 
    325387     CTimer::get("Blocking time").suspend(); 
  • XIOS/trunk/src/context_client.hpp

    r988 r1033  
    3131      // Send event to server 
    3232      void sendEvent(CEventClient& event); 
     33      bool sendTemporarilyBufferedEvent(); 
    3334      void waitEvent(list<int>& ranks); 
    3435 
    35       // Functions relates to set/get buffers 
    36       list<CBufferOut*> getBuffers(list<int>& serverlist, list<int>& sizeList); 
     36      // Functions to set/get buffers 
     37      bool getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 
    3738      void newBuffer(int rank); 
    3839      bool checkBuffers(list<int>& ranks); 
     
    4647 
    4748      bool isAttachedModeEnabled() const; 
     49 
     50      bool hasTemporarilyBufferedEvent() const { return !tmpBufferedEvent.isEmpty(); }; 
    4851 
    4952      // Close and finalize context client 
     
    7679      StdSize maxBufferedEvents; 
    7780 
     81      struct { 
     82        std::list<int> ranks, sizes; 
     83        std::list<CBufferOut*> buffers; 
     84 
     85        bool isEmpty() const { return ranks.empty(); }; 
     86        void clear() { 
     87          ranks.clear(); 
     88          sizes.clear(); 
     89 
     90          for (std::list<CBufferOut*>::iterator it = buffers.begin(); it != buffers.end(); it++) 
     91            delete *it; 
     92 
     93          buffers.clear(); 
     94        }; 
     95      } tmpBufferedEvent; //! Event temporarily buffered (used only on the server) 
     96 
    7897      //! Context for server (Only used in attached mode) 
    7998      CContext* parentServer; 
  • XIOS/trunk/src/context_server.cpp

    r998 r1033  
    5757  } 
    5858 
    59   bool CContextServer::eventLoop(void) 
     59  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/) 
    6060  { 
    6161    listen(); 
    6262    checkPendingRequest(); 
    63     processEvents(); 
     63    if (enableEventsProcessing) 
     64      processEvents(); 
    6465    return finished; 
    6566  } 
  • XIOS/trunk/src/context_server.hpp

    r697 r1033  
    1515 
    1616    CContextServer(CContext* parent,MPI_Comm intraComm,MPI_Comm interComm) ; 
    17     bool eventLoop(void) ; 
     17    bool eventLoop(bool enableEventsProcessing = true); 
    1818    void listen(void) ; 
    1919    void checkPendingRequest(void) ; 
  • XIOS/trunk/src/node/context.cpp

    r1028 r1033  
    339339   } 
    340340 
    341    //! Server side: Put server into a loop in order to listen message from client 
    342    bool CContext::eventLoop(void) 
    343    { 
    344      return server->eventLoop(); 
    345    } 
    346  
    347341   //! Try to send the buffers and receive possible answers 
    348342   bool CContext::checkBuffersAndListen(void) 
    349343   { 
    350344     client->checkBuffers(); 
    351      return server->eventLoop(); 
     345 
     346     bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 
     347     if (hasTmpBufferedEvent) 
     348       hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 
     349 
     350     // Don't process events if there is a temporarily buffered event 
     351     return server->eventLoop(!hasTmpBufferedEvent); 
    352352   } 
    353353 
  • XIOS/trunk/src/node/context.hpp

    r917 r1033  
    9393 
    9494         // Put sever or client into loop state 
    95          bool eventLoop(void); 
    96  
    9795         bool checkBuffersAndListen(void); 
    9896 
Note: See TracChangeset for help on using the changeset viewer.