Ignore:
Timestamp:
03/22/18 10:43:20 (6 years ago)
Author:
yushan
Message:

branch_openmp merged with XIOS_DEV_CMIP6@1459

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/branch_openmp/src/context_client.cpp

    r1356 r1460  
    1111#include "timer.hpp" 
    1212#include "cxios.hpp" 
     13#include "server.hpp" 
    1314using namespace ep_lib; 
    1415 
     
    1920    \param [in] intraComm_ communicator of group client 
    2021    \param [in] interComm_ communicator of group server 
    21     \cxtSer [in] cxtSer Pointer to context of server side. (It is only used on case of attached mode) 
     22    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode). 
    2223    */ 
    2324    CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) 
     
    4041      else  MPI_Comm_size(interComm, &serverSize); 
    4142 
     43      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 
     44 
     45      timeLine = 0; 
     46    } 
     47 
     48    void CContextClient::computeLeader(int clientRank, int clientSize, int serverSize, 
     49                                       std::list<int>& rankRecvLeader, 
     50                                       std::list<int>& rankRecvNotLeader) 
     51    { 
     52      if ((0 == clientSize) || (0 == serverSize)) return; 
     53 
    4254      if (clientSize < serverSize) 
    4355      { 
     
    5567 
    5668        for (int i = 0; i < serverByClient; i++) 
    57           ranksServerLeader.push_back(rankStart + i); 
    58  
    59         ranksServerNotLeader.resize(0); 
     69          rankRecvLeader.push_back(rankStart + i); 
     70 
     71        rankRecvNotLeader.resize(0); 
    6072      } 
    6173      else 
     
    6779        { 
    6880          if (clientRank % (clientByServer + 1) == 0) 
    69             ranksServerLeader.push_back(clientRank / (clientByServer + 1)); 
     81            rankRecvLeader.push_back(clientRank / (clientByServer + 1)); 
    7082          else 
    71             ranksServerNotLeader.push_back(clientRank / (clientByServer + 1)); 
     83            rankRecvNotLeader.push_back(clientRank / (clientByServer + 1)); 
    7284        } 
    7385        else 
     
    7587          int rank = clientRank - (clientByServer + 1) * remain; 
    7688          if (rank % clientByServer == 0) 
    77             ranksServerLeader.push_back(remain + rank / clientByServer); 
     89            rankRecvLeader.push_back(remain + rank / clientByServer); 
    7890          else 
    79             ranksServerNotLeader.push_back(remain + rank / clientByServer); 
    80         }         
    81       } 
    82  
    83       timeLine = 0; 
     91            rankRecvNotLeader.push_back(remain + rank / clientByServer); 
     92        } 
     93      } 
    8494    } 
    8595 
     
    92102      list<int> ranks = event.getRanks(); 
    93103 
     104      if (CXios::checkEventSync) 
     105      { 
     106        int typeId, classId, typeId_in, classId_in, timeLine_out; 
     107        typeId_in=event.getTypeId() ; 
     108        classId_in=event.getClassId() ; 
     109        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; 
     110        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ; 
     111        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ; 
     112        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine) 
     113        { 
     114           ERROR("void CContextClient::sendEvent(CEventClient& event)", 
     115               << "Event are not coherent between client."); 
     116        } 
     117      } 
     118 
    94119      if (!event.isEmpty()) 
    95120      { 
    96121        list<int> sizes = event.getSizes(); 
    97122 
    98         // We force the getBuffers call to be non-blocking on the servers 
     123        // We force the getBuffers call to be non-blocking on classical servers 
    99124        list<CBufferOut*> buffList; 
    100         bool couldBuffer = getBuffers(ranks, sizes, buffList, !CXios::isClient); 
     125        bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) )); 
     126//        bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer ); 
    101127 
    102128        if (couldBuffer) 
     
    119145          for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 
    120146            tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 
    121  
     147          info(100)<<"DEBUG : temporaly event created : timeline "<<timeLine<<endl ; 
    122148          event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 
    123149        } 
     
    146172            (*itBuffer)->put((char*)(*it)->start(), (*it)->count()); 
    147173 
     174          info(100)<<"DEBUG : temporaly event sent "<<endl ; 
    148175          checkBuffers(tmpBufferedEvent.ranks); 
    149176 
     
    187214     * \return whether the already allocated buffers could be used 
    188215    */ 
    189     bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking /*= false*/) 
     216    bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 
     217                                    bool nonBlocking /*= false*/) 
    190218    { 
    191219      list<int>::const_iterator itServer, itSize; 
     
    216244        { 
    217245          checkBuffers(); 
    218           context->server->listen(); 
     246          if (CServer::serverLevel == 0) 
     247            context->server->listen(); 
     248 
     249          else if (CServer::serverLevel == 1) 
     250          { 
     251            context->server->listen(); 
     252            for (int i = 0; i < context->serverPrimServer.size(); ++i) 
     253              context->serverPrimServer[i]->listen(); 
     254            CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 
     255          } 
     256 
     257          else if (CServer::serverLevel == 2) 
     258            context->server->listen(); 
     259 
    219260        } 
    220261      } while (!areBuffersFree && !nonBlocking); 
     262 
    221263      CTimer::get("Blocking time").suspend(); 
    222264 
     
    257299      map<int,CClientBuffer*>::iterator itBuff; 
    258300      bool pending = false; 
    259       for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) pending |= itBuff->second->checkBuffer(); 
     301      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 
     302        pending |= itBuff->second->checkBuffer(); 
    260303      return pending; 
    261304   } 
    262305 
    263306   //! Release all buffers 
    264    void CContextClient::releaseBuffers(void) 
     307   void CContextClient::releaseBuffers() 
    265308   { 
    266309      map<int,CClientBuffer*>::iterator itBuff; 
    267       for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) delete itBuff->second; 
     310      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 
     311      { 
     312          delete itBuff->second; 
     313      } 
     314      buffers.clear(); 
    268315   } 
    269316 
     
    362409 
    363410   /*! 
    364    Finalize context client and do some reports 
    365    */ 
    366    void CContextClient::finalize(void) 
    367    { 
    368      map<int,CClientBuffer*>::iterator itBuff; 
    369      bool stop = false; 
    370  
    371      CTimer::get("Blocking time").resume(); 
    372      while (hasTemporarilyBufferedEvent()) 
    373      { 
    374        checkBuffers(); 
    375        sendTemporarilyBufferedEvent(); 
    376      } 
    377      CTimer::get("Blocking time").suspend(); 
    378  
    379      CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 
    380      if (isServerLeader()) 
    381      { 
    382        CMessage msg; 
    383        const std::list<int>& ranks = getRanksServerLeader(); 
    384        for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
    385          event.push(*itRank, 1, msg); 
    386        sendEvent(event); 
    387      } 
    388      else sendEvent(event); 
    389  
    390      CTimer::get("Blocking time").resume(); 
    391      while (!stop) 
    392      { 
    393        checkBuffers(); 
    394        if (hasTemporarilyBufferedEvent()) 
    395          sendTemporarilyBufferedEvent(); 
    396  
    397        stop = true; 
    398        for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 
    399      } 
    400      CTimer::get("Blocking time").suspend(); 
    401  
    402      std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
    403                                            iteMap = mapBufferSize_.end(), itMap; 
    404      StdSize totalBuf = 0; 
    405      for (itMap = itbMap; itMap != iteMap; ++itMap) 
    406      { 
    407        //report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 
    408        //           << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 
    409        totalBuf += itMap->second; 
    410      } 
    411      //report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 
    412  
    413      releaseBuffers(); 
    414    } 
     411   * Finalize context client and do some reports. Function is non-blocking. 
     412   */ 
     413  void CContextClient::finalize(void) 
     414  { 
     415    map<int,CClientBuffer*>::iterator itBuff; 
     416    bool stop = false; 
     417 
     418    CTimer::get("Blocking time").resume(); 
     419    while (hasTemporarilyBufferedEvent()) 
     420    { 
     421      checkBuffers(); 
     422      sendTemporarilyBufferedEvent(); 
     423    } 
     424    CTimer::get("Blocking time").suspend(); 
     425 
     426    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 
     427    if (isServerLeader()) 
     428    { 
     429      CMessage msg; 
     430      const std::list<int>& ranks = getRanksServerLeader(); 
     431      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 
     432      { 
     433        #pragma omp critical (_output) 
     434        info(100)<<"DEBUG : Sent context Finalize event to rank "<<*itRank<<endl ; 
     435        event.push(*itRank, 1, msg); 
     436      } 
     437      sendEvent(event); 
     438    } 
     439    else sendEvent(event); 
     440 
     441    CTimer::get("Blocking time").resume(); 
     442//    while (!stop) 
     443    { 
     444      checkBuffers(); 
     445      if (hasTemporarilyBufferedEvent()) 
     446        sendTemporarilyBufferedEvent(); 
     447 
     448      stop = true; 
     449//      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 
     450    } 
     451    CTimer::get("Blocking time").suspend(); 
     452 
     453    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
     454                                          iteMap = mapBufferSize_.end(), itMap; 
     455 
     456    StdSize totalBuf = 0; 
     457    for (itMap = itbMap; itMap != iteMap; ++itMap) 
     458    { 
     459      #pragma omp critical (_output) 
     460      report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl 
     461                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; 
     462      totalBuf += itMap->second; 
     463    } 
     464    #pragma omp critical (_output) 
     465    report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; 
     466 
     467    //releaseBuffers(); // moved to CContext::finalize() 
     468  } 
     469 
     470 
     471  /*! 
     472  */ 
     473  bool CContextClient::havePendingRequests(void) 
     474  { 
     475    bool pending = false; 
     476    map<int,CClientBuffer*>::iterator itBuff; 
     477    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 
     478      pending |= itBuff->second->hasPendingRequest(); 
     479    return pending; 
     480  } 
     481 
     482 
    415483} 
Note: See TracChangeset for help on using the changeset viewer.