Changeset 597


Ignore:
Timestamp:
05/26/15 16:13:47 (9 years ago)
Author:
rlacroix
Message:

Add basic infrastructure for servers to clients communication.

Location:
XIOS/trunk/src
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/client.cpp

    r591 r597  
    201201        context->initClient(contextComm,contextInterComm, contextServer); 
    202202 
    203  
    204203        // Secondly, initialize context on server side 
    205         contextServer->initServer(contextComm,contextInterComm); 
     204        contextServer->initServer(contextComm,contextInterComm, context); 
    206205 
    207206        // Finally, we should return current context to context client 
  • XIOS/trunk/src/context_client.cpp

    r596 r597  
    206206         free &= (*itBuffer)->isBufferFree(*itSize); 
    207207        } 
     208        if (!free) 
     209          context->server->listen(); 
    208210      } 
    209211      CTimer::get("Blocking time").suspend(); 
  • XIOS/trunk/src/context_server.cpp

    r549 r597  
    5252  } 
    5353 
     54  bool CContextServer::hasFinished(void) 
     55  { 
     56    return finished; 
     57  } 
     58 
    5459  bool CContextServer::eventLoop(void) 
    5560  { 
     
    7984        { 
    8085          it=buffers.find(rank); 
    81           if (it==buffers.end()) 
     86          if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
    8287          { 
    8388            StdSize buffSize = 0; 
     
    169174      if (event->isFull()) 
    170175      { 
    171         if (!scheduled && !CXios::isServer) 
     176        if (!scheduled && CServer::eventScheduler) // Skip event scheduling for attached mode and reception on client side 
    172177        { 
    173178          CServer::eventScheduler->registerEvent(currentTimeLine,hashId); 
    174179          scheduled=true; 
    175180        } 
    176         else if (CXios::isServer || CServer::eventScheduler->queryEvent(currentTimeLine,hashId) ) 
     181        else if (!CServer::eventScheduler || CServer::eventScheduler->queryEvent(currentTimeLine,hashId) ) 
    177182        { 
    178183         CTimer::get("Process events").resume(); 
     
    207212    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE) 
    208213    { 
     214      finished=true; 
    209215      info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl; 
    210216      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
     
    218224      } 
    219225      context->finalize(); 
    220       finished=true; 
    221226      report(0)<< " Memory report : Context <"<<context->getId()<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl; 
    222227    } 
  • XIOS/trunk/src/context_server.hpp

    r591 r597  
    2323    void setPendingEvent(void) ; 
    2424    bool hasPendingEvent(void) ; 
     25    bool hasFinished(void); 
    2526 
    2627    MPI_Comm intraComm ; 
  • XIOS/trunk/src/interface/c/iccalendar.cpp

    r591 r597  
    1313    CTimer::get("XIOS").resume(); 
    1414    xios::CContext* context = CContext::getCurrent(); 
    15     if (!context->hasServer) context->client->checkBuffers(); 
     15    if (!context->hasServer) context->checkBuffersAndListen(); 
    1616    context->updateCalendar(step); 
    1717    context->sendUpdateCalendar(step); 
  • XIOS/trunk/src/interface/c/icdata.cpp

    r593 r597  
    373373      CTimer::get("XIOS send field").resume(); 
    374374      CContext* context = CContext::getCurrent(); 
    375       if (!context->hasServer) context->client->checkBuffers(); 
     375      if (!context->hasServer) context->checkBuffersAndListen(); 
    376376      CArray<double, 1> data(data_k8, shape(data_Xsize), neverDeleteData); 
    377377      CField::get(fieldid_str)->setData(data); 
     
    389389 
    390390      CContext* context = CContext::getCurrent(); 
    391       if (!context->hasServer) context->client->checkBuffers(); 
     391      if (!context->hasServer) context->checkBuffersAndListen(); 
    392392 
    393393      CArray<double, 1> data(data_k8, shape(data_Xsize), neverDeleteData); 
     
    407407 
    408408      CContext* context = CContext::getCurrent(); 
    409       if (!context->hasServer) context->client->checkBuffers(); 
     409      if (!context->hasServer) context->checkBuffersAndListen(); 
    410410 
    411411      CArray<double, 2>data(data_k8, shape(data_Xsize, data_Ysize), neverDeleteData); 
     
    425425 
    426426      CContext* context = CContext::getCurrent(); 
    427       if (!context->hasServer) context->client->checkBuffers(); 
     427      if (!context->hasServer) context->checkBuffersAndListen(); 
    428428 
    429429      CArray<double, 3>data(data_k8, shape(data_Xsize, data_Ysize, data_Zsize), neverDeleteData); 
     
    442442      CTimer::get("XIOS send field").resume(); 
    443443      CContext* context = CContext::getCurrent(); 
    444       if (!context->hasServer) context->client->checkBuffers(); 
     444      if (!context->hasServer) context->checkBuffersAndListen(); 
    445445 
    446446      CArray<float, 1> data_tmp(data_k4, shape(data_Xsize), neverDeleteData); 
     
    461461 
    462462      CContext* context = CContext::getCurrent(); 
    463       if (!context->hasServer) context->client->checkBuffers(); 
     463      if (!context->hasServer) context->checkBuffersAndListen(); 
    464464 
    465465      CArray<float, 1> data_tmp(data_k4, shape(data_Xsize), neverDeleteData); 
     
    481481 
    482482      CContext* context = CContext::getCurrent(); 
    483       if (!context->hasServer) context->client->checkBuffers(); 
     483      if (!context->hasServer) context->checkBuffersAndListen(); 
    484484 
    485485      CArray<float, 2> data_tmp(data_k4, shape(data_Xsize, data_Ysize), neverDeleteData); 
     
    501501 
    502502      CContext* context = CContext::getCurrent(); 
    503       if (!context->hasServer) context->client->checkBuffers(); 
     503      if (!context->hasServer) context->checkBuffersAndListen(); 
    504504 
    505505      CArray<float, 3> data_tmp(data_k4, shape(data_Xsize, data_Ysize, data_Zsize), neverDeleteData); 
     
    523523 
    524524      CContext* context = CContext::getCurrent(); 
    525       if (!context->hasServer) context->client->checkBuffers(); 
     525      if (!context->hasServer) context->checkBuffersAndListen(); 
    526526 
    527527      CArray<double, 1> data(data_k8, shape(data_Xsize), neverDeleteData); 
     
    541541 
    542542      CContext* context = CContext::getCurrent(); 
    543       if (!context->hasServer) context->client->checkBuffers(); 
     543      if (!context->hasServer) context->checkBuffersAndListen(); 
    544544 
    545545      CArray<double, 2>data(data_k8, shape(data_Xsize, data_Ysize), neverDeleteData); 
     
    559559 
    560560      CContext* context = CContext::getCurrent(); 
    561       if (!context->hasServer) context->client->checkBuffers(); 
     561      if (!context->hasServer) context->checkBuffersAndListen(); 
    562562 
    563563      CArray<double, 3>data(data_k8, shape(data_Xsize, data_Ysize, data_Zsize), neverDeleteData); 
     
    577577 
    578578      CContext* context = CContext::getCurrent(); 
    579       if (!context->hasServer) context->client->checkBuffers(); 
     579      if (!context->hasServer) context->checkBuffersAndListen(); 
    580580 
    581581      CArray<double, 1> data(data_Xsize); 
     
    597597 
    598598      CContext* context = CContext::getCurrent(); 
    599       if (!context->hasServer) context->client->checkBuffers(); 
     599      if (!context->hasServer) context->checkBuffersAndListen(); 
    600600 
    601601      CArray<double, 2> data(data_Xsize, data_Ysize); 
     
    617617 
    618618      CContext* context = CContext::getCurrent(); 
    619       if (!context->hasServer) context->client->checkBuffers(); 
     619      if (!context->hasServer) context->checkBuffersAndListen(); 
    620620 
    621621      CArray<double, 3> data(data_Xsize, data_Ysize, data_Zsize); 
  • XIOS/trunk/src/node/context.cpp

    r595 r597  
    2323   CContext::CContext(void) 
    2424      : CObjectTemplate<CContext>(), CContextAttributes() 
    25       , calendar(),hasClient(false),hasServer(false), isPostProcessed(false), dataSize_(), idServer_() 
     25      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false) 
     26      , dataSize_(), idServer_(), client(0), server(0) 
    2627   { /* Ne rien faire de plus */ } 
    2728 
    2829   CContext::CContext(const StdString & id) 
    2930      : CObjectTemplate<CContext>(id), CContextAttributes() 
    30       , calendar(),hasClient(false),hasServer(false), isPostProcessed(false), dataSize_(), idServer_() 
     31      , calendar(), hasClient(false), hasServer(false), isPostProcessed(false), finalized(false) 
     32      , dataSize_(), idServer_(), client(0), server(0) 
    3133   { /* Ne rien faire de plus */ } 
    3234 
    3335   CContext::~CContext(void) 
    3436   { 
    35      if (hasClient) delete client; 
    36      if (hasServer) delete server; 
     37     delete client; 
     38     delete server; 
    3739   } 
    3840 
     
    231233 
    232234   //! Initialize client side 
    233    void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer) 
     235   void CContext::initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer /*= 0*/) 
    234236   { 
    235237     hasClient=true; 
    236238     client = new CContextClient(this,intraComm, interComm, cxtServer); 
     239     MPI_Comm intraCommServer, interCommServer; 
     240     if (cxtServer) // Attached mode 
     241     { 
     242       intraCommServer = intraComm; 
     243       interCommServer = interComm; 
     244     } 
     245     else 
     246     { 
     247       MPI_Comm_dup(intraComm, &intraCommServer); 
     248       MPI_Comm_dup(interComm, &interCommServer); 
     249     } 
     250     server = new CContextServer(this,intraCommServer,interCommServer); 
    237251   } 
    238252 
     
    277291 
    278292   //! Initialize server 
    279    void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm) 
     293   void CContext::initServer(MPI_Comm intraComm,MPI_Comm interComm, CContext* cxtClient /*= 0*/) 
    280294   { 
    281295     hasServer=true; 
    282296     server = new CContextServer(this,intraComm,interComm); 
     297     MPI_Comm intraCommClient, interCommClient; 
     298     if (cxtClient) // Attached mode 
     299     { 
     300       intraCommClient = intraComm; 
     301       interCommClient = interComm; 
     302     } 
     303     else 
     304     { 
     305       MPI_Comm_dup(intraComm, &intraCommClient); 
     306       MPI_Comm_dup(interComm, &interCommClient); 
     307     } 
     308     client = new CContextClient(this,intraCommClient,interCommClient, cxtClient); 
     309     // Do something clever instead 
     310     std::map<int, StdSize> bufferSize; 
     311     for (int r = 0; r < client->serverSize; r++) 
     312       bufferSize[r] = 10 * sizeof(size_t) * 1024; 
     313     client->setBufferSize(bufferSize); 
    283314   } 
    284315 
     
    289320   } 
    290321 
     322   //! Try to send the buffers and receive possible answers 
     323   bool CContext::checkBuffersAndListen(void) 
     324   { 
     325     client->checkBuffers(); 
     326     return server->eventLoop(); 
     327   } 
     328 
    291329   //! Terminate a context 
    292330   void CContext::finalize(void) 
    293331   { 
    294       if (hasClient && !hasServer) 
    295       { 
    296          client->finalize(); 
    297       } 
    298       if (hasServer) 
    299       { 
    300         closeAllFile(); 
     332      if (!finalized) 
     333      { 
     334        finalized = true; 
     335 
     336        client->finalize(); 
     337        while (!server->hasFinished()) 
     338        { 
     339          server->eventLoop(); 
     340        } 
     341 
     342        if (hasServer) 
     343        { 
     344          closeAllFile(); 
     345        } 
    301346      } 
    302347   } 
  • XIOS/trunk/src/node/context.hpp

    r593 r597  
    8787      public : 
    8888         // Initialize server or client 
    89          void initServer(MPI_Comm intraComm, MPI_Comm interComm); 
    90          void initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer=0); 
     89         void initServer(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtClient = 0); 
     90         void initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext* cxtServer = 0); 
    9191         bool isInitialized(void); 
    9292 
    9393         // Put sever or client into loop state 
    9494         bool eventLoop(void); 
    95          bool serverLoop(void); 
    96          void clientLoop(void); 
     95 
     96         bool checkBuffersAndListen(void); 
    9797 
    9898         // Finalize a context 
     
    210210      private: 
    211211         bool isPostProcessed; 
     212         bool finalized; 
    212213         std::map<int, StdSize> dataSize_; 
    213214         StdString idServer_; 
  • XIOS/trunk/src/server.cpp

    r591 r597  
    2525    bool CServer::finished=false ; 
    2626    bool CServer::is_MPI_Initialized ; 
    27     CEventScheduler* CServer::eventScheduler ; 
     27    CEventScheduler* CServer::eventScheduler = 0; 
    2828 
    2929    void CServer::initialize(void) 
     
    392392       for(it=contextList.begin();it!=contextList.end();it++) 
    393393       { 
    394          finished=it->second->eventLoop() ; 
     394         finished=it->second->checkBuffersAndListen(); 
    395395         if (finished) 
    396396         { 
Note: See TracChangeset for help on using the changeset viewer.