Ignore:
Timestamp:
06/13/22 17:04:55 (2 years ago)
Author:
jderouillat
Message:

Reintroduced oasis_enddef management on the server side (communications are operated on MPI_COMM_WORLD in Oasis). XIOS clients require to initilialize Oasis by themselves first, and then to specify their local_comm to xios_initialize. Coupled components must be specified through clients_code_id in iodef.xml

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/server.cpp

    r2332 r2333  
    1919#include "contexts_manager.hpp" 
    2020#include "servers_ressource.hpp" 
     21#include "services.hpp" 
    2122#include <cstdio> 
    2223#include "workflow_graph.hpp" 
    2324#include "release_static_allocation.hpp" 
     25#include <sys/stat.h> 
     26#include <unistd.h> 
    2427 
    2528 
     
    97100 
    98101        oasis_get_localcomm(serverComm); 
     102        MPI_Comm_dup(serverComm, &intraComm_); 
    99103      } 
    100104      CTimer::get("XIOS").resume() ; 
     
    171175      auto daemonsManager=CXios::getDaemonsManager() ; 
    172176      auto serversRessource=CServer::getServersRessource() ; 
     177 
     178      int rank; 
     179      MPI_Comm_rank(intraComm_, &rank) ; 
     180      if (rank==0) isRoot=true; 
     181      else isRoot=false; 
    173182 
    174183      if (serversRessource->isServerLeader()) 
     
    242251        std::ifstream ifs ; 
    243252        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ; 
    244         do 
    245         { 
    246           ifs.clear() ; 
    247           ifs.open(fileName, std::ifstream::in) ; 
    248         } while (ifs.fail()) ; 
     253        struct stat buffer; 
     254        do { 
     255        } while( stat(fileName.c_str(), &buffer) != 0 ); 
     256        sleep(1); 
     257        ifs.open(fileName, ifstream::in) ; 
    249258        ifs>>clientsRank[i] ; 
     259        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl; 
    250260        ifs.close() ;  
    251261      } 
     
    257267      {   
    258268        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm); 
     269        interCommLeft.push_back(interComm) ; 
    259270        MPI_Comm_free(&intraComm) ; 
    260271        MPI_Intercomm_merge(interComm,false, &intraComm ) ; 
     
    298309    } 
    299310 
     311   /*! 
     312    * Root process is listening for an order sent by client to call "oasis_enddef". 
     313    * The root client of a compound send the order (tag 5). It is probed and received. 
     314    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any). 
     315    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done 
     316    */ 
     317     
     318     void CServer::listenOasisEnddef(void) 
     319     { 
     320        int flag ; 
     321        MPI_Status status ; 
     322        list<MPI_Comm>::iterator it; 
     323        int msg ; 
     324        static int nbCompound=0 ; 
     325        int size ; 
     326        static bool sent=false ; 
     327        static MPI_Request* allRequests ; 
     328        static MPI_Status* allStatus ; 
     329 
     330 
     331        if (sent) 
     332        { 
     333          MPI_Comm_size(intraComm_,&size) ; 
     334          MPI_Testall(size,allRequests, &flag, allStatus) ; 
     335          if (flag==true) 
     336          { 
     337            delete [] allRequests ; 
     338            delete [] allStatus ; 
     339            sent=false ; 
     340          } 
     341        } 
     342         
     343 
     344        for(it=interCommLeft.begin();it!=interCommLeft.end();it++) 
     345        { 
     346           MPI_Status status ; 
     347           traceOff() ; 
     348           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5 
     349           traceOn() ; 
     350           if (flag==true) 
     351           { 
     352              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5 
     353              nbCompound++ ; 
     354              if (nbCompound==interCommLeft.size()) 
     355              { 
     356                MPI_Comm_size(intraComm_,&size) ; 
     357                allRequests= new MPI_Request[size] ; 
     358                allStatus= new MPI_Status[size] ; 
     359                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5 
     360                sent=true ; 
     361              } 
     362           } 
     363        } 
     364} 
     365      
     366   /*! 
     367    * Processes probes message from root process if oasis_enddef call must be done. 
     368    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator 
     369    */ 
     370     void CServer::listenRootOasisEnddef(void) 
     371     { 
     372       int flag ; 
     373       MPI_Status status ; 
     374       const int root=0 ; 
     375       int msg ; 
     376       static bool eventSent=false ; 
     377 
     378       if (eventSent) 
     379       { 
     380         boost::hash<string> hashString; 
     381         size_t hashId = hashString("oasis_enddef"); 
     382         if (CXios::getPoolRessource()->getService(CXios::defaultServerId,0)->getEventScheduler()->queryEvent(0,hashId)) 
     383         { 
     384           CXios::getPoolRessource()->getService(CXios::defaultServerId,0)->getEventScheduler()->popEvent() ; 
     385           oasis_enddef() ; 
     386           eventSent=false ; 
     387         } 
     388       } 
     389          
     390       traceOff() ; 
     391       MPI_Iprobe(root,5,intraComm_, &flag, &status) ; 
     392       traceOn() ; 
     393       if (flag==true) 
     394       { 
     395           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5 
     396           boost::hash<string> hashString; 
     397           size_t hashId = hashString("oasis_enddef"); 
     398           CXios::getPoolRessource()->getService(CXios::defaultServerId,0)->getEventScheduler()->registerEvent(0,hashId); 
     399           eventSent=true ; 
     400       } 
     401     } 
    300402 
    301403    void CServer::finalize(void) 
Note: See TracChangeset for help on using the changeset viewer.