Changeset 2343


Ignore:
Timestamp:
06/29/22 18:19:45 (23 months ago)
Author:
ymipsl
Message:
  • Implement new infrastructure for transfert protocol.
  • new purelly one sided protocol is now available, the previous protocol (legacy, mix send/recv and one sided) is still available. Other specific protocol could be implemented more easilly in future.
  • switch can be operate with "transport_protocol" variable in XIOS context :

ex:
<variable id="transport_protocol" type="string">one_sided</variable>

Available protocols are : one_sided, legacy or default. The default protocol is "legacy".

YM

Location:
XIOS/dev/dev_ym/XIOS_COUPLING/src
Files:
19 added
4 deleted
9 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/event_client.cpp

    r1377 r2343  
    5656     } 
    5757   } 
     58 
     59   void CEventClient::send(size_t timeLine, int size, CBufferOut* buffer) { *buffer << size << timeLine << *itNbSenders << classId << typeId << **itMessages;} 
     60  
    5861} 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/event_client.hpp

    r1377 r2343  
    1717      void push(int rank, int nbSender, CMessage& msg); 
    1818      void send(size_t timeLine, const std::list<int>& sizes, std::list<CBufferOut*>&);  
    19  
    2019      bool isEmpty(void); 
    2120      std::list<int> getRanks(void); 
     
    2423      int getTypeId(void) { return typeId; } 
    2524       
     25      void setFirst(void) 
     26      { 
     27        itRanks=ranks.begin() ; 
     28        itNbSenders=nbSenders.begin() ; 
     29        itMessages=messages.begin() ; 
     30      } 
     31 
     32      bool isFirst(void) 
     33      { 
     34        return itRanks==ranks.begin() ; 
     35      } 
     36 
     37      void next(void) 
     38      { 
     39        itRanks++ ; 
     40        if (itRanks==ranks.end()) itRanks=ranks.begin() ; 
     41        itNbSenders++ ; 
     42        if (itNbSenders==nbSenders.end()) itNbSenders=nbSenders.begin() ; 
     43        itMessages++ ; 
     44        if (itMessages==messages.end()) itMessages=messages.begin() ; 
     45      } 
     46       
     47      void remove(void) 
     48      { 
     49        auto removedRank = itRanks; 
     50        itRanks++ ; 
     51        ranks.erase(removedRank) ; 
     52        if (itRanks==ranks.end()) itRanks=ranks.begin() ; 
     53 
     54        auto removedNbSender = itNbSenders ; 
     55        itNbSenders++ ; 
     56        nbSenders.erase(removedNbSender) ; 
     57        if (itNbSenders==nbSenders.end()) itNbSenders=nbSenders.begin() ; 
     58 
     59        auto removedMessage = itMessages ; 
     60        itMessages++ ; 
     61        messages.erase(removedMessage) ; 
     62        if (itMessages==messages.end()) itMessages=messages.begin() ; 
     63      } 
     64 
     65      int getRank(void) { return *itRanks ;} 
     66      int getNbSender(void) { return *itNbSenders ;} 
     67      int getSize(void) { return (*itMessages)->size() + headerSize;} 
     68      void send(size_t timeLine, int size, CBufferOut* buffer) ; 
    2669    private: 
    2770      int classId; 
     
    3073      std::list<int> nbSenders; 
    3174      std::list<CMessage*> messages; 
     75       
     76      std::list<int>::iterator       itRanks; 
     77      std::list<int>::iterator       itNbSenders; 
     78      std::list<CMessage*>::iterator itMessages; 
    3279  }; 
    3380} 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/event_server.cpp

    r716 r2343  
    3535    ev.buffer = new CBufferIn(buffer.ptr(), buffer.remain()); 
    3636    ev.size = size;   
     37    ev.startBuffer = startBuffer ; // for one sided, take ownership of buffer for now 
    3738    subEvents.push_back(ev); 
    3839   
     
    4445    } 
    4546  } 
     47 
    4648 
    4749  bool CEventServer::isFull(void)   
     
    5557    for (it = subEvents.begin(); it != subEvents.end(); it++) 
    5658    { 
    57       it->serverBuffer->freeBuffer(it->size); 
    58       delete it->buffer; 
     59      if (it->serverBuffer==nullptr) // one_sided case 
     60      { 
     61        delete [] it->startBuffer ; 
     62        delete it->buffer; 
     63      } 
     64      else // legacy case 
     65      { 
     66        it->serverBuffer->freeBuffer(it->size); 
     67        delete it->buffer; 
     68      } 
    5969    } 
    6070  } 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/event_server.hpp

    r1853 r2343  
    2121 
    2222    void push(int rank,CServerBuffer* serverBuffer ,char* startBuffer,int size) ; 
     23    int getNbSender(void) {return nbSender ;} 
     24 
    2325    CContextServer* getContextServer(void) { return contextServer_ ;} 
    2426     
     
    2628    { 
    2729      int rank ; 
     30      char* startBuffer ; 
    2831      CServerBuffer* serverBuffer ; 
    2932      CBufferIn*  buffer ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/axis.cpp

    r2339 r2343  
    190190     { 
    191191       // size estimation for sendDistributedValue 
    192        std::unordered_map<int, vector<size_t> >::const_iterator it, ite = indSrv_[client->serverSize].end(); 
    193        for (it = indSrv_[client->serverSize].begin(); it != ite; ++it) 
     192       std::unordered_map<int, vector<size_t> >::const_iterator it, ite = indSrv_[client->getRemoteSize()].end(); 
     193       for (it = indSrv_[client->getRemoteSize()].begin(); it != ite; ++it) 
    194194       { 
    195195         size_t size = 6 * sizeof(size_t); 
     
    890890    if (type==EDistributionType::BANDS) // Bands distribution to send to file server 
    891891    { 
    892       int nbServer = client->serverSize; 
    893       int nbClient = client->clientSize ; 
    894       int rankClient = client->clientRank ; 
     892      int nbServer = client->getRemoteSize(); 
     893      int nbClient = client->getIntraCommSize() ; 
     894      int rankClient = client->getIntraCommRank() ; 
    895895      int size = nbServer / nbClient ; 
    896896      int start ; 
     
    921921    else if (type==EDistributionType::NONE) // domain is not distributed ie all servers get the same local domain 
    922922    { 
    923       int nbServer = client->serverSize; 
     923      int nbServer = client->getRemoteSize(); 
    924924      size_t nglo=n_glo ; 
    925925      CArray<size_t,1> indGlo(nglo) ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp

    r2337 r2343  
    77#include "duration.hpp" 
    88 
    9 #include "context_client.hpp" 
    10 #include "context_server.hpp" 
     9#include "legacy_context_client.hpp" 
     10#include "legacy_context_server.hpp" 
     11#include "one_sided_context_client.hpp" 
     12#include "one_sided_context_server.hpp" 
    1113#include "nc4_data_output.hpp" 
    1214#include "node_type.hpp" 
     
    532534    comms.push_back(intraCommClient); 
    533535    // attached_mode=parentServerContext_->isAttachedMode() ; //ym probably inherited from source context 
    534     server = new CContextServer(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ? 
    535     client = new CContextClient(this,intraCommClient,interCommClient); 
     536    server = CContextServer::getNew(this,intraComm_, interCommServer); // check if we need to dupl. intraComm_ ? 
     537    client = CContextClient::getNew(this,intraCommClient,interCommClient); 
    536538    client->setAssociatedServer(server) ;   
    537539    server->setAssociatedClient(client) ;   
     
    594596      intraCommClient=intraComm_ ; 
    595597      MPI_Comm_dup(intraComm_, &intraCommServer) ; 
    596       client = new CContextClient(this, intraCommClient, interCommClient); 
    597       server = new CContextServer(this, intraCommServer, interCommServer); 
     598      client = CContextClient::getNew(this, intraCommClient, interCommClient); 
     599      server = CContextServer::getNew(this, intraCommServer, interCommServer); 
    598600      client->setAssociatedServer(server) ; 
    599601      server->setAssociatedClient(client) ; 
     
    631633        MPI_Comm_dup(intraComm_, &intraCommServer) ; 
    632634 
    633         CContextClient* client = new CContextClient(this, intraCommClient, interCommClient) ; 
    634         CContextServer* server = new CContextServer(this, intraCommServer, interCommServer) ; 
     635        CContextClient* client = CContextClient::getNew(this, intraCommClient, interCommClient) ; 
     636        CContextServer* server = CContextServer::getNew(this, intraCommServer, interCommServer) ; 
    635637        client->setAssociatedServer(server) ; 
    636638        server->setAssociatedClient(client) ; 
     
    728730        MPI_Comm_dup(interComm, &interCommClient) ; 
    729731        MPI_Comm_dup(interComm, &interCommServer) ; 
    730         CContextClient* client = new CContextClient(this, intraCommClient, interCommClient); 
    731         CContextServer* server = new CContextServer(this, intraCommServer, interCommServer); 
     732        CContextClient* client = CContextClient::getNew(this, intraCommClient, interCommClient); 
     733        CContextServer* server = CContextServer::getNew(this, intraCommServer, interCommServer); 
    732734        client->setAssociatedServer(server) ; 
    733735        server->setAssociatedClient(client) ; 
     
    750752       MPI_Comm_dup(interComm, &interCommServer) ; 
    751753       MPI_Comm_dup(interComm, &interCommClient) ; 
    752        CContextServer* server = new CContextServer(this, intraCommServer, interCommServer); 
    753        CContextClient* client = new CContextClient(this, intraCommClient, interCommClient); 
     754       CContextServer* server = CContextServer::getNew(this, intraCommServer, interCommServer); 
     755       CContextClient* client = CContextClient::getNew(this, intraCommClient, interCommClient); 
    754756       client->setAssociatedServer(server) ; 
    755757       server->setAssociatedClient(client) ; 
     
    765767   { 
    766768      registryOut->hierarchicalGatherRegistry() ; 
    767       if (server->intraCommRank==0) CXios::getRegistryManager()->merge(*registryOut) ; 
     769      if (server->getIntraCommRank()==0) CXios::getRegistryManager()->merge(*registryOut) ; 
    768770 
    769771      if (serviceType_==CServicesManager::CLIENT) 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/domain.cpp

    r2339 r2343  
    162162     } 
    163163 
    164      std::unordered_map<int, vector<size_t> >::const_iterator itIndexEnd = indSrv_[client->serverSize].end(); 
     164     std::unordered_map<int, vector<size_t> >::const_iterator itIndexEnd = indSrv_[client->getRemoteSize()].end(); 
    165165     // std::map<int, std::vector<int> >::const_iterator itWrittenIndexEnd = indWrittenSrv_.end(); 
    166      for (size_t k = 0; k < connectedServerRank_[client->serverSize].size(); ++k) 
    167      { 
    168        int rank = connectedServerRank_[client->serverSize][k]; 
    169        std::unordered_map<int, std::vector<size_t> >::const_iterator it = indSrv_[client->serverSize].find(rank); 
     166     for (size_t k = 0; k < connectedServerRank_[client->getRemoteSize()].size(); ++k) 
     167     { 
     168       int rank = connectedServerRank_[client->getRemoteSize()][k]; 
     169       std::unordered_map<int, std::vector<size_t> >::const_iterator it = indSrv_[client->getRemoteSize()].find(rank); 
    170170       size_t idxCount = (it != itIndexEnd) ? it->second.size() : 0; 
    171171 
     
    18831883    if (type==EDistributionType::BANDS) // Bands distribution to send to file server 
    18841884    { 
    1885       int nbServer = client->serverSize; 
     1885      int nbServer = client->getRemoteSize(); 
    18861886      std::vector<int> nGlobDomain(2); 
    18871887      nGlobDomain[0] = this->ni_glo; 
     
    19191919    if (distType==EDistributionType::BANDS) // Bands distribution to send to file server 
    19201920    { 
    1921       int nbServer = client->serverSize; 
    1922       int nbClient = client->clientSize ; 
    1923       int rankClient = client->clientRank ; 
     1921      int nbServer = client->getRemoteSize(); 
     1922      int nbClient = client->getIntraCommSize() ; 
     1923      int rankClient = client->getIntraCommRank() ; 
    19241924      int size = nbServer / nbClient ; 
    19251925      int start ; 
     
    19521952    else if (distType==EDistributionType::COLUMNS) // Bands distribution to send to file server 
    19531953    { 
    1954       int nbServer = client->serverSize; 
    1955       int nbClient = client->clientSize ; 
    1956       int rankClient = client->clientRank ; 
     1954      int nbServer = client->getRemoteSize(); 
     1955      int nbClient = client->getIntraCommSize() ; 
     1956      int rankClient = client->getIntraCommRank() ; 
    19571957      int size = nbServer / nbClient ; 
    19581958      int start ; 
     
    19881988    else if (distType==EDistributionType::NONE) // domain is not distributed ie all servers get the same local domain 
    19891989    { 
    1990       int nbServer = client->serverSize; 
     1990      int nbServer = client->getRemoteSize(); 
    19911991      int nglo=ni_glo*nj_glo ; 
    19921992      CArray<size_t,1> indGlo ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/grid.cpp

    r2316 r2343  
    944944 
    945945     std::map<int, StdSize> dataSizes; 
    946      int receiverSize = client->serverSize; 
     946     int receiverSize = client->getRemoteSize(); 
    947947     std::map<int,size_t>& dataSizeMap = bufferForWriting ? connectedDataSize_[receiverSize]: connectedDataSizeRead_; 
    948948     std::vector<int>& connectedServerRanks = bufferForWriting ? connectedServerRank_[receiverSize] : connectedServerRankRead_; 
     
    13951395    else if (0 != client) 
    13961396    { 
    1397       return  (isDataDistributed() ||  (1 != client->clientSize) || (1 != client->serverSize)); 
     1397      return  (isDataDistributed() ||  (1 != client->getRemoteSize()) || (1 != client->getRemoteSize())); 
    13981398    } 
    13991399    else 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/scalar.cpp

    r2304 r2343  
    411411    else if (type==EDistributionType::NONE) // domain is not distributed ie all servers get the same local domain 
    412412    { 
    413       int nbServer = client->serverSize; 
     413      int nbServer = client->getRemoteSize(); 
    414414      CArray<size_t,1> indGlo(nglo) ; 
    415415      for(size_t i=0;i<nglo;i++) indGlo(i) = i ; 
Note: See TracChangeset for help on using the changeset viewer.