Ignore:
Timestamp:
02/17/17 19:51:36 (7 years ago)
Author:
oabramkina
Message:

dev: intermediate commit.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_olga/src/context_client.cpp

    r1021 r1054  
    8484    { 
    8585      list<int> ranks = event.getRanks(); 
     86 
    8687      if (!event.isEmpty()) 
    8788      { 
    8889        list<int> sizes = event.getSizes(); 
    8990 
    90         list<CBufferOut*> buffList = getBuffers(ranks, sizes); 
    91  
    92         event.send(timeLine, sizes, buffList); 
    93  
    94         checkBuffers(ranks); 
    95       } 
    96  
    97       if (isAttachedModeEnabled()) 
    98       { 
    99         waitEvent(ranks); 
    100         CContext::setCurrent(context->getId()); 
     91        // We force the getBuffers call to be non-blocking on the servers 
     92        list<CBufferOut*> buffList; 
     93        bool couldBuffer = getBuffers(ranks, sizes, buffList, !CXios::isClient); 
     94//        bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer); 
     95 
     96        if (couldBuffer) 
     97        { 
     98          event.send(timeLine, sizes, buffList); 
     99 
     100          checkBuffers(ranks); 
     101 
     102          if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
     103          { 
     104            waitEvent(ranks); 
     105            CContext::setCurrent(context->getId()); 
     106          } 
     107        } 
     108        else 
     109        { 
     110          tmpBufferedEvent.ranks = ranks; 
     111          tmpBufferedEvent.sizes = sizes; 
     112 
     113          for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 
     114            tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 
     115 
     116          event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 
     117        } 
    101118      } 
    102119 
    103120      timeLine++; 
     121    } 
     122 
     123    /*! 
     124     * Send the temporarily buffered event (if any). 
     125     * 
     126     * \return true if a temporarily buffered event could be sent, false otherwise 
     127     */ 
     128    bool CContextClient::sendTemporarilyBufferedEvent() 
     129    { 
     130      bool couldSendTmpBufferedEvent = false; 
     131 
     132      if (hasTemporarilyBufferedEvent()) 
     133      { 
     134        list<CBufferOut*> buffList; 
     135        if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call 
     136        { 
     137          list<CBufferOut*>::iterator it, itBuffer; 
     138 
     139          for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++) 
     140            (*itBuffer)->put((char*)(*it)->start(), (*it)->count()); 
     141 
     142          checkBuffers(tmpBufferedEvent.ranks); 
     143 
     144          tmpBufferedEvent.clear(); 
     145 
     146          couldSendTmpBufferedEvent = true; 
     147        } 
     148      } 
     149 
     150      return couldSendTmpBufferedEvent; 
    104151    } 
    105152 
     
    124171    } 
    125172 
    126     /*! 
    127     Setup buffer for each connection to server and verify their state to put content into them 
    128     \param [in] serverList list of rank of connected server 
    129     \param [in] sizeList size of message corresponding to each connection 
    130     \return List of buffer input which event can be placed 
    131     */ 
    132     list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList) 
    133     { 
    134       list<int>::iterator itServer, itSize; 
     173 
     174    /*! 
     175     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless 
     176     * it is explicitly requested to be non-blocking. 
     177     * 
     178     * \param [in] serverList list of rank of connected server 
     179     * \param [in] sizeList size of message corresponding to each connection 
     180     * \param [out] retBuffers list of buffers that can be used to store an event 
     181     * \param [in] nonBlocking whether this function should be non-blocking 
     182     * \return whether the already allocated buffers could be used 
     183    */ 
     184    bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking /*= false*/) 
     185    { 
     186      list<int>::const_iterator itServer, itSize; 
    135187      list<CClientBuffer*> bufferList; 
    136       map<int,CClientBuffer*>::iterator it; 
     188      map<int,CClientBuffer*>::const_iterator it; 
    137189      list<CClientBuffer*>::iterator itBuffer; 
    138       list<CBufferOut*>  retBuffer; 
    139190      bool areBuffersFree; 
    140191 
     
    160211        { 
    161212          checkBuffers(); 
    162           context->server->listen(); 
    163         } 
    164       } while (!areBuffersFree); 
     213//          if (?) 
     214//          { 
     215//            for (int i = 0; i < context->serverPrimServer.size(); ++i) 
     216//              context->serverPrimServer[i]->listen(); 
     217//          } 
     218//          else 
     219            context->server->listen(); 
     220        } 
     221      } while (!areBuffersFree && !nonBlocking); 
    165222      CTimer::get("Blocking time").suspend(); 
    166223 
    167       for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
    168       { 
    169         retBuffer.push_back((*itBuffer)->getBuffer(*itSize)); 
    170       } 
    171       return retBuffer; 
     224      if (areBuffersFree) 
     225      { 
     226        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     227          retBuffers.push_back((*itBuffer)->getBuffer(*itSize)); 
     228      } 
     229 
     230      return areBuffersFree; 
    172231   } 
    173232 
     
    299358   Finalize context client and do some reports 
    300359   */ 
    301    void CContextClient::finalize(void) 
    302    { 
    303      map<int,CClientBuffer*>::iterator itBuff; 
    304      bool stop = true; 
    305  
    306      CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 
    307      if (isServerLeader()) 
    308      { 
    309        CMessage msg; 
    310        const std::list<int>& ranks = getRanksServerLeader(); 
    311        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
    312          event.push(*itRank, 1, msg); 
    313        sendEvent(event); 
    314      } 
    315      else sendEvent(event); 
    316  
    317      CTimer::get("Blocking time").resume(); 
    318      while (stop) 
    319      { 
    320        checkBuffers(); 
    321        stop = false; 
    322        for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop |= itBuff->second->hasPendingRequest(); 
    323      } 
    324      CTimer::get("Blocking time").suspend(); 
    325  
    326      std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
    327                                            iteMap = mapBufferSize_.end(), itMap; 
    328      StdSize totalBuf = 0; 
    329      for (itMap = itbMap; itMap != iteMap; ++itMap) 
    330      { 
    331        report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 
    332                   << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 
    333        totalBuf += itMap->second; 
    334      } 
    335      report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 
    336  
    337      releaseBuffers(); 
    338    } 
     360  void CContextClient::finalize(void) 
     361  { 
     362    map<int,CClientBuffer*>::iterator itBuff; 
     363    bool stop = false; 
     364 
     365    CTimer::get("Blocking time").resume(); 
     366    while (hasTemporarilyBufferedEvent()) 
     367    { 
     368      checkBuffers(); 
     369      sendTemporarilyBufferedEvent(); 
     370    } 
     371    CTimer::get("Blocking time").suspend(); 
     372 
     373    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 
     374    if (isServerLeader()) 
     375    { 
     376      CMessage msg; 
     377      const std::list<int>& ranks = getRanksServerLeader(); 
     378      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
     379        event.push(*itRank, 1, msg); 
     380      sendEvent(event); 
     381    } 
     382    else sendEvent(event); 
     383 
     384    CTimer::get("Blocking time").resume(); 
     385    while (!stop) 
     386    { 
     387      checkBuffers(); 
     388      if (hasTemporarilyBufferedEvent()) 
     389        sendTemporarilyBufferedEvent(); 
     390 
     391      stop = true; 
     392      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 
     393    } 
     394    CTimer::get("Blocking time").suspend(); 
     395 
     396    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
     397                                          iteMap = mapBufferSize_.end(), itMap; 
     398    StdSize totalBuf = 0; 
     399    for (itMap = itbMap; itMap != iteMap; ++itMap) 
     400    { 
     401      report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 
     402                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 
     403      totalBuf += itMap->second; 
     404    } 
     405    report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 
     406 
     407    releaseBuffers(); 
     408  } 
    339409} 
Note: See TracChangeset for help on using the changeset viewer.