source: XIOS/dev/dev_trunk_omp/src/cxios.cpp @ 1670

Last change on this file since 1670 was 1670, checked in by yushan, 5 years ago

MARK: branch merged with trunk @1663. Generate one static graph for each output file

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 10.6 KB
RevLine 
[300]1
[591]2#include "xios_spl.hpp"
[300]3#include "cxios.hpp"
[342]4#include "client.hpp"
5#include "server.hpp"
[346]6#include "xml_parser.hpp"
[300]7#include <boost/functional/hash.hpp>
[382]8#include "mpi.hpp"
[400]9#include "memory.hpp"
10#include <new>
[401]11#include "memtrack.hpp"
[697]12#include "registry.hpp"
[1646]13#include "timer.hpp"
14#ifdef _usingEP
[1601]15using namespace ep_lib;
[1646]16#endif
[1668]17#include "graphviz.hpp"
[335]18namespace xios
[300]19{
[1601]20  const string CXios::rootFile="./iodef.xml" ;
21  const string CXios::xiosCodeId="xios.x" ;
22  const string CXios::clientFile="./xios_client";
23  const string CXios::serverFile="./xios_server";
24  const string CXios::serverPrmFile="./xios_server1";
25  const string CXios::serverSndFile="./xios_server2";
[490]26
[1646]27  bool CXios::xiosStack = true;
28  bool CXios::systemStack = false;
29
[300]30  bool CXios::isClient ;
31  bool CXios::isServer ;
32  MPI_Comm CXios::globalComm ;
33  bool CXios::usingOasis ;
[491]34  bool CXios::usingServer = false;
[1021]35  bool CXios::usingServer2 = false;
36  int CXios::ratioServer2 = 50;
[1243]37  int CXios::nbPoolsServer2 = 1;
[718]38  double CXios::bufferSizeFactor = 1.0;
39  const double CXios::defaultBufferSizeFactor = 1.0;
[719]40  StdSize CXios::minBufferSize = 1024 * sizeof(double);
[1227]41  StdSize CXios::maxBufferSize = std::numeric_limits<int>::max() ;
[523]42  bool CXios::printLogs2Files;
[511]43  bool CXios::isOptPerformance = true;
[697]44  CRegistry* CXios::globalRegistry = 0;
[1375]45  double CXios::recvFieldTimeout = 300.0;
[1377]46  bool CXios::checkEventSync=false ;
[1376]47 
[512]48  //! Parse configuration file and create some objects from it
[300]49  void CXios::initialize()
50  {
[400]51    set_new_handler(noMemory);
[1601]52    int tmp_rank;
53    MPI_Comm_rank(MPI_COMM_WORLD, &tmp_rank);
54    #pragma omp critical
55    {
56      parseFile(rootFile);
57      std::cout<<"thread "<<tmp_rank<<"("<<omp_get_thread_num()<<")"<<" parsed rootfile"<<std::endl;
58    }
59    #pragma omp barrier
[511]60    parseXiosConfig();
61  }
62
[512]63  /*!
64  \brief Parse xios part of configuration file (.iodef.xml)
65   Both client and server need information returned from this function
66  */
[511]67  void CXios::parseXiosConfig()
68  {
[311]69    usingOasis=getin<bool>("using_oasis",false) ;
[506]70    usingServer=getin<bool>("using_server",false) ;
[1021]71    usingServer2=getin<bool>("using_server2",false) ;
72    ratioServer2=getin<int>("ratio_server2",50);
[1519]73    nbPoolsServer2=getin<int>("number_pools_server2",0);
[311]74    info.setLevel(getin<int>("info_level",0)) ;
[512]75    report.setLevel(getin<int>("info_level",50));
[523]76    printLogs2Files=getin<bool>("print_file",false);
[512]77
[1646]78    xiosStack=getin<bool>("xios_stack",true) ;
79    systemStack=getin<bool>("system_stack",false) ;
80    if (xiosStack && systemStack)
81    {
82      xiosStack = false;
83    }
84
[511]85    StdString bufMemory("memory");
86    StdString bufPerformance("performance");
87    StdString bufOpt = getin<StdString>("optimal_buffer_size", bufPerformance);
88    std::transform(bufOpt.begin(), bufOpt.end(), bufOpt.begin(), ::tolower);
89    if (0 == bufOpt.compare(bufMemory)) isOptPerformance = false;
90    else if (0 != bufOpt.compare(bufPerformance))
91    {
92      ERROR("CXios::parseXiosConfig()", << "optimal_buffer_size must be memory or performance "<< endl );
93    }
94
[718]95    bufferSizeFactor = getin<double>("buffer_size_factor", defaultBufferSizeFactor);
[719]96    minBufferSize = getin<int>("min_buffer_size", 1024 * sizeof(double));
[1227]97    maxBufferSize = getin<int>("max_buffer_size", std::numeric_limits<int>::max());
[1376]98    recvFieldTimeout = getin<double>("recv_field_timeout", recvFieldTimeout);
[1158]99    if (recvFieldTimeout < 0.0)
100      ERROR("CXios::parseXiosConfig()", "recv_field_timeout cannot be negative.");
[718]101
[1377]102    checkEventSync = getin<bool>("check_event_sync", checkEventSync);
103
[1665]104   
105    #ifdef _usingEP
[1601]106    int num_ep;
[1646]107
108    if(isClient)  num_ep = omp_get_num_threads();
109    if(isServer) num_ep = 1;
[1601]110       
111    MPI_Info info;
112    #pragma omp master
113    {
114      MPI_Comm *ep_comm;
115      MPI_Comm_create_endpoints(MPI_COMM_WORLD->mpi_comm, num_ep, info, ep_comm);  // servers should reach here too.
116      passage = ep_comm; 
117    }
118       
119    #pragma omp barrier
120    CXios::globalComm = passage[omp_get_thread_num()];
[1665]121
122    #else
123    globalComm=MPI_COMM_WORLD ;
124
[1646]125    #endif
[300]126  }
127
[512]128  /*!
129  Initialize client
130  \param [in] codeId identity of context
131  \param [in] localComm local communicator
132  \param [in/out] returnComm communicator corresponding to group of client with same codeId
133  */
[300]134  void CXios::initClientSide(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)
[1646]135  TRY
[300]136  {
[1601]137    isClient = true;
138    isServer = false;
139
[300]140    initialize() ;
[490]141
[491]142    CClient::initialize(codeId,localComm,returnComm) ;
[697]143    if (CClient::getRank()==0) globalRegistry = new CRegistry(returnComm) ;
[491]144
[956]145    // If there are no server processes then we are in attached mode
146    // and the clients are also servers
147    isServer = !usingServer;
[490]148
[523]149    if (printLogs2Files)
150    {
[1601]151      #pragma omp critical
[499]152      CClient::openInfoStream(clientFile);
[523]153      CClient::openErrorStream(clientFile);
154    }
[490]155    else
[523]156    {
[490]157      CClient::openInfoStream();
[523]158      CClient::openErrorStream();
159    }
[490]160  }
[1646]161  CATCH
[300]162
163  void CXios::clientFinalize(void)
164  {
[490]165     CClient::finalize() ;
[697]166     if (CClient::getRank()==0)
167     {
[1601]168       #pragma omp critical (_output)
[1646]169       {
170        info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;
171       }
[697]172       globalRegistry->toFile("xios_registry.bin") ;
173       delete globalRegistry ;
[1668]174       
[1669]175       CGraphviz::showStaticWorkflowGraph();
[1668]176       CGraphviz::buildStaticWorkflowGraph();
[1669]177       
[697]178     }
[490]179
[1669]180
[401]181#ifdef XIOS_MEMTRACK
[1158]182
183#ifdef XIOS_MEMTRACK_LIGHT
184       report(10) << " Memory report : current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
185       report(10) << " Memory report : maximum memory used by XIOS : "<<  MemTrack::getMaxMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
186#endif
187
188#ifdef XIOS_MEMTRACK_FULL
[401]189     MemTrack::TrackListMemoryUsage() ;
[490]190     MemTrack::TrackDumpBlocks();
[401]191#endif
[1158]192
193     CClient::closeInfoStream();
194
195#endif
[490]196  }
197
[512]198  //! Init server by parsing only xios part of config file
[509]199  void CXios::initServer()
200  {
201    set_new_handler(noMemory);
202    std::set<StdString> parseList;
203    parseList.insert("xios");
204    xml::CXMLParser::ParseFile(rootFile, parseList);
[511]205    parseXiosConfig();
[509]206  }
207
[512]208  //! Initialize server then put it into listening state
[1054]209  void CXios::initServerSide(void)
[300]210  {
[1601]211
[1158]212    isClient = false;
213    isServer = true;
[1021]214
[1601]215    initServer();
216
[1021]217    // Initialize all aspects MPI
218    CServer::initialize();
[1330]219    if (CServer::getRank()==0 && CServer::serverLevel != 1) globalRegistry = new CRegistry(CServer::intraComm) ;
[697]220   
[523]221    if (printLogs2Files)
222    {
[1021]223      if (CServer::serverLevel == 0)
[983]224      {
225        CServer::openInfoStream(serverFile);
226        CServer::openErrorStream(serverFile);
227      }
[1021]228      else if (CServer::serverLevel == 1)
[983]229      {
[1021]230        CServer::openInfoStream(serverPrmFile);
231        CServer::openErrorStream(serverPrmFile);
[983]232      }
233      else
234      {
[1021]235        CServer::openInfoStream(serverSndFile);
236        CServer::openErrorStream(serverSndFile);
[983]237      }
[523]238    }
[490]239    else
[523]240    {
[490]241      CServer::openInfoStream();
[523]242      CServer::openErrorStream();
243    }
[490]244
[491]245    // Enter the loop to listen message from Client
[490]246    CServer::eventLoop();
247
[491]248    // Finalize
[1234]249    if (CServer::serverLevel == 0)
[1168]250    {
251      if (CServer::getRank()==0)
252      {
[1646]253        #pragma omp critical (_output)
254        {
255          info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;
256        }
[1168]257        globalRegistry->toFile("xios_registry.bin") ;
258        delete globalRegistry ;
259      }
260    }
261    else
262    {
[1243]263      // If using two server levels:
264      // (1) merge registries on each pool
265      // (2) send merged registries to the first pool
266      // (3) merge received registries on the first pool
[1168]267      if (CServer::serverLevel == 2)
268      {
269        vector<int>& secondaryServerGlobalRanks = CServer::getSecondaryServerGlobalRanks();
270        int firstPoolGlobalRank = secondaryServerGlobalRanks[0];
271        int rankGlobal;
272        MPI_Comm_rank(globalComm, &rankGlobal);
273
[1243]274        // Merge registries defined on each pools
275        CRegistry globalRegistrySndServers (CServer::intraComm);
276
[1168]277        // All pools (except the first): send globalRegistry to the first pool
[1243]278        for (int i=1; i<secondaryServerGlobalRanks.size(); i++)
[1168]279        {
[1243]280          if (rankGlobal == secondaryServerGlobalRanks[i])
281          {
282            globalRegistrySndServers.mergeRegistry(*globalRegistry) ;
283            int registrySize = globalRegistrySndServers.size();
284            MPI_Send(&registrySize,1,MPI_LONG,firstPoolGlobalRank,15,CXios::globalComm) ;
285            CBufferOut buffer(registrySize) ;
286            globalRegistrySndServers.toBuffer(buffer) ;
287            MPI_Send(buffer.start(),registrySize,MPI_CHAR,firstPoolGlobalRank,15,CXios::globalComm) ;
288          }
[1168]289        }
290
291        // First pool: receive globalRegistry of all secondary server pools, merge and write the resultant registry
[1243]292        if (rankGlobal == firstPoolGlobalRank)
[1168]293        {
294          MPI_Status status;
295          char* recvBuff;
296
297          globalRegistrySndServers.mergeRegistry(*globalRegistry) ;
298
299          for (int i=1; i< secondaryServerGlobalRanks.size(); i++)
300          {
301            int rank = secondaryServerGlobalRanks[i];
302            int registrySize = 0;
303            MPI_Recv(&registrySize, 1, MPI_LONG, rank, 15, CXios::globalComm, &status);
304            recvBuff = new char[registrySize];
305            MPI_Recv(recvBuff, registrySize, MPI_CHAR, rank, 15, CXios::globalComm, &status);
306            CBufferIn buffer(recvBuff, registrySize) ;
307            CRegistry recvRegistry;
308            recvRegistry.fromBuffer(buffer) ;
309            globalRegistrySndServers.mergeRegistry(recvRegistry) ;
310            delete[] recvBuff;
311          }
312
[1646]313          #pragma omp critical (_output)
314          {
315            info(80)<<"Write data base Registry"<<endl<<globalRegistrySndServers.toString()<<endl ;
316          }
[1168]317          globalRegistrySndServers.toFile("xios_registry.bin") ;
318
319        }
320      }
321      delete globalRegistry;
322    }
[1646]323    CTimer::get("XIOS").suspend() ;
[490]324    CServer::finalize();
[1158]325
326#ifdef XIOS_MEMTRACK
327
328#ifdef XIOS_MEMTRACK_LIGHT
329       report(10) << " Memory report : current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
330       report(10) << " Memory report : maximum memory used by XIOS : "<<  MemTrack::getMaxMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
331#endif
332
333#ifdef XIOS_MEMTRACK_FULL
334     MemTrack::TrackListMemoryUsage() ;
335     MemTrack::TrackDumpBlocks();
336#endif
337#endif
[490]338    CServer::closeInfoStream();
339  }
340
[512]341  //! Parse configuration file
[346]342  void CXios::parseFile(const string& filename)
343  {
[490]344    xml::CXMLParser::ParseFile(filename);
[346]345  }
[491]346
[512]347  //! Set using server
[491]348  void CXios::setUsingServer()
349  {
350    usingServer = true;
351  }
352
[512]353  //! Unset using server
[491]354  void CXios::setNotUsingServer()
355  {
356    usingServer = false;
357  }
[300]358}
[1670]359
Note: See TracBrowser for help on using the repository browser.