Ignore:
Timestamp:
05/15/17 15:00:24 (7 years ago)
Author:
oabramkina
Message:

Two-level server: merging new grid functionalities and changes in the communication protocol (e.g. non-blocking context finalize, registries, oasis).

Tests on curie: test_client, test_complete, nemo (test_xios2_cmip6.exe).

To do: non-structured grid, check reading, possible bug in client/server initialization (?).

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_olga/src/node/context.cpp

    r1129 r1130  
    2626      : CObjectTemplate<CContext>(), CContextAttributes() 
    2727      , calendar(), hasClient(false), hasServer(false) 
    28       , isPostProcessed(false), finalized(false) 
     28      , isPostProcessed(false)//, finalized(false) 
    2929      , idServer_(), client(0), server(0) 
    30       , allProcessed(false) 
     30      , allProcessed(false), countChildCtx_(0) 
    3131   { /* Ne rien faire de plus */ } 
    3232 
     
    3434      : CObjectTemplate<CContext>(id), CContextAttributes() 
    3535      , calendar(), hasClient(false), hasServer(false) 
    36       , isPostProcessed(false), finalized(false) 
     36      , isPostProcessed(false)//, finalized(false) 
    3737      , idServer_(), client(0), server(0) 
    38       , allProcessed(false) 
     38      , allProcessed(false), countChildCtx_(0) 
    3939   { /* Ne rien faire de plus */ } 
    4040 
     
    300300     std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize); 
    301301 
    302  
    303302     std::map<int, StdSize>::iterator it, ite = dataBufferSize.end(); 
    304303     for (it = dataBufferSize.begin(); it != ite; ++it) 
     
    351350     hasServer=true; 
    352351     server = new CContextServer(this,intraComm,interComm); 
    353 //     client = new CContextClient(this,intraComm,interComm, cxtClient); 
    354352 
    355353     registryIn=new CRegistry(intraComm); 
     
    373371       comms.push_back(interCommClient); 
    374372     } 
    375      client = new CContextClient(this,intraCommClient,interCommClient); 
    376  
     373     client = new CContextClient(this,intraCommClient,interCommClient,cxtClient); 
    377374   } 
    378375 
    379376   //! Try to send the buffers and receive possible answers 
    380    bool CContext::checkBuffersAndListen(void) 
    381    { 
    382      if (CServer::serverLevel == 0) 
    383      { 
    384        client->checkBuffers(); 
    385        bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 
    386        if (hasTmpBufferedEvent) 
    387          hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 
    388        // Don't process events if there is a temporarily buffered event 
    389        return server->eventLoop(!hasTmpBufferedEvent); 
    390      } 
    391  
    392      else if (CServer::serverLevel == 1) 
    393      { 
    394        client->checkBuffers(); 
    395        bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 
    396        if (hasTmpBufferedEvent) 
    397          hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 
    398        bool serverFinished = server->eventLoop(!hasTmpBufferedEvent); 
    399  
    400        bool serverPrimFinished = true; 
    401        for (int i = 0; i < clientPrimServer.size(); ++i) 
    402        { 
    403          clientPrimServer[i]->checkBuffers(); 
    404          bool hasTmpBufferedEventPrim = clientPrimServer[i]->hasTemporarilyBufferedEvent(); 
    405          if (hasTmpBufferedEventPrim) 
    406            hasTmpBufferedEventPrim = !clientPrimServer[i]->sendTemporarilyBufferedEvent(); 
    407 //         serverPrimFinished *= serverPrimServer[i]->eventLoop(!hasTmpBufferedEventPrim); 
    408          serverPrimFinished *= serverPrimServer[i]->eventLoop(); 
    409        } 
    410        return ( serverFinished && serverPrimFinished); 
    411      } 
    412  
    413      else if (CServer::serverLevel == 2) 
    414      { 
    415        client->checkBuffers(); 
    416        bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 
    417 //       if (hasTmpBufferedEvent) 
    418 //         hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 
    419 //       return server->eventLoop(!hasTmpBufferedEvent); 
    420        return server->eventLoop(); 
     377  bool CContext::checkBuffersAndListen(void) 
     378  { 
     379    bool clientReady, serverFinished; 
     380 
     381    // Only classical servers are non-blocking 
     382    if (CServer::serverLevel == 0) 
     383    { 
     384      client->checkBuffers(); 
     385      bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 
     386      if (hasTmpBufferedEvent) 
     387        hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 
     388      // Don't process events if there is a temporarily buffered event 
     389      return server->eventLoop(!hasTmpBufferedEvent); 
     390    } 
     391    else if (CServer::serverLevel == 1) 
     392    { 
     393      client->checkBuffers(); 
     394      bool serverFinished = server->eventLoop(); 
     395      bool serverPrimFinished = true; 
     396      for (int i = 0; i < clientPrimServer.size(); ++i) 
     397      { 
     398        clientPrimServer[i]->checkBuffers(); 
     399        serverPrimFinished *= serverPrimServer[i]->eventLoop(); 
    421400      } 
    422    } 
     401      return ( serverFinished && serverPrimFinished); 
     402    } 
     403 
     404    else if (CServer::serverLevel == 2) 
     405    { 
     406      client->checkBuffers(); 
     407      return server->eventLoop(); 
     408    } 
     409  } 
    423410 
    424411   //! Terminate a context 
    425412   void CContext::finalize(void) 
    426413   { 
    427      if (!finalized) 
    428      { 
    429        finalized = true; 
    430  
     414     // Send registry upon calling the function the first time 
     415     if (countChildCtx_ == 0) 
    431416       if (hasClient) sendRegistry() ; 
    432417 
    433        if ((hasClient) && (hasServer)) 
     418     // Client: 
     419     // (1) blocking send context finalize to its server 
     420     // (2) blocking receive context finalize from its server 
     421     if (CXios::isClient) 
     422     { 
     423       // Make sure that client (model) enters the loop only once 
     424       if (countChildCtx_ < 1) 
    434425       { 
     426         ++countChildCtx_; 
     427 
     428         client->finalize(); 
     429         while (client->havePendingRequests()) 
     430            client->checkBuffers(); 
     431 
     432         while (!server->hasFinished()) 
     433           server->eventLoop(); 
     434       } 
     435     } 
     436     // Server: non-blocking send context finalize 
     437     else if (CXios::isServer) 
     438     { 
     439       // First context finalize message received from a model => send context finalize to its child contexts (if any) 
     440       if (countChildCtx_ == 0) 
    435441         for (int i = 0; i < clientPrimServer.size(); ++i) 
    436442           clientPrimServer[i]->finalize(); 
    437443 
    438          for (int i = 0; i < serverPrimServer.size(); ++i) 
    439          { 
    440            while (!serverPrimServer[i]->hasFinished()) 
    441            { 
    442              serverPrimServer[i]->eventLoop(); 
    443              CServer::eventScheduler->checkEvent() ; 
    444            } 
    445          } 
    446        } 
    447        client->finalize(); 
    448        while (!server->hasFinished()) 
    449        { 
    450          server->eventLoop(); 
    451        } 
    452  
    453        info(20)<<"Server Side context <"<<getId()<<"> finalized"<<endl; 
    454        report(0)<< " Memory report : Context <"<<getId()<<"> : server side : total memory used for buffers "<<CContextServer::getTotalBuf()<<" bytes"<<endl; 
    455  
    456 //       if (hasServer) 
    457        if (hasServer && !hasClient) 
    458        { 
    459          closeAllFile(); 
    460          registryOut->hierarchicalGatherRegistry() ; 
    461          if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 
    462        } 
    463       } 
    464    } 
    465  
    466    //! Free internally allocated communicators 
    467    void CContext::freeComms(void) 
    468    { 
     444       // (Last) context finalized message received => send context finalize to its parent context 
     445       if (countChildCtx_ == clientPrimServer.size()) 
     446         client->finalize(); 
     447 
     448       ++countChildCtx_; 
     449     } 
     450 
     451     // If in mode attache call postFinalize 
     452     if (CXios::isServer && CXios::isClient) 
     453       postFinalize(); 
     454   } 
     455 
     456   /*! 
     457   * \fn void CContext::postFinalize(void) 
     458   * Close files, gather registries, , and make deallocations. 
     459   * Function is called when a context is finalized (it has nothing to receive and nothing to send). 
     460   */ 
     461   void CContext::postFinalize(void) 
     462   { 
     463     info(20)<<"Context <"<<getId()<<"> is finalized."<<endl; 
     464 
     465     //     if (hasServer && !hasClient) 
     466     { 
     467       closeAllFile(); 
     468       registryOut->hierarchicalGatherRegistry() ; 
     469       if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; 
     470     } 
     471 
     472     //! Free internally allocated communicators 
    469473     for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 
    470474       MPI_Comm_free(&(*it)); 
    471475     comms.clear(); 
     476 
     477     //! Deallocate client buffers 
     478     client->releaseBuffers(); 
     479     for (int i = 0; i < clientPrimServer.size(); ++i) 
     480       clientPrimServer[i]->releaseBuffers(); 
     481   } 
     482 
     483   //! Free internally allocated communicators 
     484   void CContext::freeComms(void) 
     485   { 
     486     for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 
     487       MPI_Comm_free(&(*it)); 
     488     comms.clear(); 
     489   } 
     490 
     491   //! Deallocate buffers allocated by clientContexts 
     492   void CContext::releaseClientBuffers(void) 
     493   { 
     494     client->releaseBuffers(); 
     495     for (int i = 0; i < clientPrimServer.size(); ++i) 
     496       clientPrimServer[i]->releaseBuffers(); 
    472497   } 
    473498 
     
    509534 
    510535       // After that, send all grid (if any) 
    511        sendRefGrid();        
     536       sendRefGrid(); 
    512537 
    513538       // We have a xml tree on the server side and now, it should be also processed 
    514539       sendPostProcessing(); 
    515  
    516540       sendGridEnabledFields();        
    517541     } 
    518  
    519542     allProcessed = true; 
    520543   } 
     
    555578 
    556579   void CContext::recvPostProcessingGlobalAttributes(CBufferIn& buffer) 
    557    {       
     580   { 
     581      // CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar(); 
    558582      postProcessingGlobalAttributes(); 
    559583   } 
     
    572596    postProcessingGlobalAttributes(); 
    573597 
    574     if (hasClient) sendPostProcessingGlobalAttributes();    
    575      
     598    if (hasClient) sendPostProcessingGlobalAttributes(); 
     599 
     600    // There are some processings that should be done after all of above. For example: check mask or index 
    576601    this->buildFilterGraphOfEnabledFields(); 
    577602     
    578     if (hasClient && !hasServer) 
     603     if (hasClient && !hasServer) 
    579604    { 
    580       buildFilterGraphOfFieldsWithReadAccess();       
     605      buildFilterGraphOfFieldsWithReadAccess(); 
    581606    } 
    582607 
     
    622647     { 
    623648       this->enabledFiles[i]->solveOnlyRefOfEnabledFields(false); 
    624        // this->enabledFiles[i]->solveAllReferenceEnabledField(false); 
    625649     } 
    626650 
     
    649673   } 
    650674 
    651  
    652  
    653675   void CContext::solveOnlyRefOfEnabledFields(bool sendToServer) 
    654676   { 
     
    663685       this->enabledFiles[i]->generateNewTransformationGridDest(); 
    664686     } 
     687 
    665688   } 
    666689 
     
    857880   } 
    858881 
    859 //   // Only send close definition from process having hasClient 
    860 //   void CContext::sendCloseDefinitionToServer(void) 
    861 //   { 
    862 //     CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION); 
    863 //   } 
    864  
    865882   //! Client side: Send a message to server to make it close 
    866883   void CContext::sendCloseDefinition(void) 
    867884   { 
    868885     // Use correct context client to send message 
    869      // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1; 
    870886     int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 
    871887     for (int i = 0; i < nbSrvPools; ++i) 
     
    902918   { 
    903919     // Use correct context client to send message 
    904 //     CContextClient* contextClientTmp = (0 != clientPrimServer) ? clientPrimServer : client; 
    905      // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1; 
    906920    int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; 
    907921     for (int i = 0; i < nbSrvPools; ++i) 
     
    14621476   void CContext::updateCalendar(int step) 
    14631477   { 
    1464       info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl; 
     1478      info(50) <<"Context "<< this->getId() <<" updateCalendar : before : " << calendar->getCurrentDate() << endl; 
    14651479      calendar->update(step); 
    1466       info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl; 
     1480      info(50) <<"Context "<< this->getId() << " updateCalendar : after : " << calendar->getCurrentDate() << endl; 
    14671481 
    14681482      if (hasClient) 
     
    15691583  } 
    15701584 
     1585  /*! 
     1586  * \fn bool CContext::isFinalized(void) 
     1587  * Context is finalized if: 
     1588  * (1) it has received all context finalize events 
     1589  * (2) it has no pending events to send. 
     1590  */ 
    15711591  bool CContext::isFinalized(void) 
    15721592  { 
    1573     return finalized; 
     1593    if (countChildCtx_==clientPrimServer.size()+1) 
     1594    { 
     1595      bool buffersReleased = !client->havePendingRequests(); 
     1596      for (int i = 0; i < clientPrimServer.size(); ++i) 
     1597        buffersReleased *= !clientPrimServer[i]->havePendingRequests(); 
     1598      return buffersReleased; 
     1599    } 
     1600    else 
     1601      return false; 
    15741602  } 
    15751603 
Note: See TracChangeset for help on using the changeset viewer.