source: XIOS/dev/branch_openmp/src/cxios.cpp @ 1538

Last change on this file since 1538 was 1538, checked in by yushan, 6 years ago

tests in XIOS OK (client, complete, remap, toy)

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 10.1 KB
RevLine 
[300]1
[591]2#include "xios_spl.hpp"
[300]3#include "cxios.hpp"
[342]4#include "client.hpp"
5#include "server.hpp"
[346]6#include "xml_parser.hpp"
[300]7#include <boost/functional/hash.hpp>
[382]8#include "mpi.hpp"
[400]9#include "memory.hpp"
10#include <new>
[401]11#include "memtrack.hpp"
[697]12#include "registry.hpp"
[1328]13using namespace ep_lib;
[300]14
[335]15namespace xios
[300]16{
[1331]17  const string CXios::rootFile="./iodef.xml" ;
18  const string CXios::xiosCodeId="xios.x" ;
19  const string CXios::clientFile="./xios_client";
20  const string CXios::serverFile="./xios_server";
[1460]21  const string CXios::serverPrmFile="./xios_server1";
22  const string CXios::serverSndFile="./xios_server2";
[490]23
[300]24  bool CXios::isClient ;
25  bool CXios::isServer ;
26  MPI_Comm CXios::globalComm ;
27  bool CXios::usingOasis ;
[491]28  bool CXios::usingServer = false;
[1460]29  bool CXios::usingServer2 = false;
30  int CXios::ratioServer2 = 50;
31  int CXios::nbPoolsServer2 = 1;
[718]32  double CXios::bufferSizeFactor = 1.0;
33  const double CXios::defaultBufferSizeFactor = 1.0;
[719]34  StdSize CXios::minBufferSize = 1024 * sizeof(double);
[1460]35  StdSize CXios::maxBufferSize = std::numeric_limits<int>::max() ;
[523]36  bool CXios::printLogs2Files;
[511]37  bool CXios::isOptPerformance = true;
[697]38  CRegistry* CXios::globalRegistry = 0;
[1460]39  double CXios::recvFieldTimeout = 300.0;
40  bool CXios::checkEventSync=false ;
41 
[512]42  //! Parse configuration file and create some objects from it
[300]43  void CXios::initialize()
44  {
[400]45    set_new_handler(noMemory);
[1331]46    int tmp_rank;
47    MPI_Comm_rank(MPI_COMM_WORLD, &tmp_rank);
48    #pragma omp critical
49    {
50      std::cout<<"thread "<<tmp_rank<<"("<<omp_get_thread_num()<<")"<<" parsing rootfile"<<std::endl;
51      parseFile(rootFile);
52      std::cout<<"thread "<<tmp_rank<<"("<<omp_get_thread_num()<<")"<<" parsed rootfile"<<std::endl;
53    }
54    #pragma omp barrier
[511]55    parseXiosConfig();
56  }
57
[512]58  /*!
59  \brief Parse xios part of configuration file (.iodef.xml)
60   Both client and server need information returned from this function
61  */
[511]62  void CXios::parseXiosConfig()
63  {
[311]64    usingOasis=getin<bool>("using_oasis",false) ;
[506]65    usingServer=getin<bool>("using_server",false) ;
[1460]66    usingServer2=getin<bool>("using_server2",false) ;
67    ratioServer2=getin<int>("ratio_server2",50);
68    nbPoolsServer2=getin<int>("number_pools_server2",1);
[311]69    info.setLevel(getin<int>("info_level",0)) ;
[512]70    report.setLevel(getin<int>("info_level",50));
[523]71    printLogs2Files=getin<bool>("print_file",false);
[512]72
[511]73    StdString bufMemory("memory");
74    StdString bufPerformance("performance");
75    StdString bufOpt = getin<StdString>("optimal_buffer_size", bufPerformance);
76    std::transform(bufOpt.begin(), bufOpt.end(), bufOpt.begin(), ::tolower);
77    if (0 == bufOpt.compare(bufMemory)) isOptPerformance = false;
78    else if (0 != bufOpt.compare(bufPerformance))
79    {
80      ERROR("CXios::parseXiosConfig()", << "optimal_buffer_size must be memory or performance "<< endl );
81    }
82
[718]83    bufferSizeFactor = getin<double>("buffer_size_factor", defaultBufferSizeFactor);
[719]84    minBufferSize = getin<int>("min_buffer_size", 1024 * sizeof(double));
[1460]85    maxBufferSize = getin<int>("max_buffer_size", std::numeric_limits<int>::max());
86    recvFieldTimeout = getin<double>("recv_field_timeout", recvFieldTimeout);
[1029]87    if (recvFieldTimeout < 0.0)
88      ERROR("CXios::parseXiosConfig()", "recv_field_timeout cannot be negative.");
[718]89
[1460]90    checkEventSync = getin<bool>("check_event_sync", checkEventSync);
91
[1328]92    //globalComm=MPI_COMM_WORLD ;
[1134]93    int num_ep;
94    if(isClient) 
95    { 
96      num_ep = omp_get_num_threads();
97    }
[1328]98       
[1134]99    if(isServer) 
100    { 
[1331]101      num_ep = 1;
[1134]102    }
[1328]103       
[1134]104    MPI_Info info;
105    #pragma omp master
106    {
107      MPI_Comm *ep_comm;
[1520]108      MPI_Comm_create_endpoints(MPI_COMM_WORLD->mpi_comm, num_ep, info, ep_comm);  // servers should reach here too.
[1533]109      passage = ep_comm; 
110      //::MPI_Comm_group(to_mpi_comm(MPI_COMM_WORLD->mpi_comm), &MPI_GROUP_WORLD);
[1134]111    }
[1328]112       
113    #pragma omp barrier
[1134]114   
[1328]115         
[1134]116    CXios::globalComm = passage[omp_get_thread_num()];
[300]117  }
118
[512]119  /*!
120  Initialize client
121  \param [in] codeId identity of context
122  \param [in] localComm local communicator
123  \param [in/out] returnComm communicator corresponding to group of client with same codeId
124  */
[300]125  void CXios::initClientSide(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)
126  {
[1134]127    isClient = true;
[1460]128
[300]129    initialize() ;
[490]130
[1134]131    CClient::initialize(codeId,localComm,returnComm) ;
[697]132    if (CClient::getRank()==0) globalRegistry = new CRegistry(returnComm) ;
[491]133
[956]134    // If there are no server processes then we are in attached mode
135    // and the clients are also servers
136    isServer = !usingServer;
[490]137
[523]138    if (printLogs2Files)
139    {
[1342]140      #pragma omp critical
[499]141      CClient::openInfoStream(clientFile);
[523]142      CClient::openErrorStream(clientFile);
143    }
[490]144    else
[523]145    {
[490]146      CClient::openInfoStream();
[523]147      CClient::openErrorStream();
148    }
[490]149  }
[300]150
151  void CXios::clientFinalize(void)
152  {
[490]153     CClient::finalize() ;
[697]154     if (CClient::getRank()==0)
155     {
[1342]156       #pragma omp critical (_output)
[697]157       info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;
158       globalRegistry->toFile("xios_registry.bin") ;
159       delete globalRegistry ;
160     }
[1160]161
[401]162#ifdef XIOS_MEMTRACK
[1156]163
164#ifdef XIOS_MEMTRACK_LIGHT
165       report(10) << " Memory report : current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
166       report(10) << " Memory report : maximum memory used by XIOS : "<<  MemTrack::getMaxMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
167#endif
168
169#ifdef XIOS_MEMTRACK_FULL
[401]170     MemTrack::TrackListMemoryUsage() ;
[490]171     MemTrack::TrackDumpBlocks();
[401]172#endif
[1161]173
[1205]174     CClient::closeInfoStream();
175
[1161]176#endif
[490]177  }
178
[512]179  //! Init server by parsing only xios part of config file
[509]180  void CXios::initServer()
181  {
182    set_new_handler(noMemory);
183    std::set<StdString> parseList;
184    parseList.insert("xios");
185    xml::CXMLParser::ParseFile(rootFile, parseList);
[511]186    parseXiosConfig();
[509]187  }
188
[512]189  //! Initialize server then put it into listening state
[300]190  void CXios::initServerSide(void)
191  {
[1134]192   
[956]193    isClient = false;
194    isServer = true;
[1460]195
[1134]196    initServer();
[1328]197
[491]198    // Initialize all aspects MPI
[490]199    CServer::initialize();
[1460]200    if (CServer::getRank()==0 && CServer::serverLevel != 1) globalRegistry = new CRegistry(CServer::intraComm) ;
[697]201   
[523]202    if (printLogs2Files)
203    {
[1460]204      if (CServer::serverLevel == 0)
205      {
206        CServer::openInfoStream(serverFile);
207        CServer::openErrorStream(serverFile);
208      }
209      else if (CServer::serverLevel == 1)
210      {
211        CServer::openInfoStream(serverPrmFile);
212        CServer::openErrorStream(serverPrmFile);
213      }
214      else
215      {
216        CServer::openInfoStream(serverSndFile);
217        CServer::openErrorStream(serverSndFile);
218      }
[523]219    }
[490]220    else
[523]221    {
[490]222      CServer::openInfoStream();
[523]223      CServer::openErrorStream();
224    }
[490]225
[491]226    // Enter the loop to listen message from Client
[490]227    CServer::eventLoop();
228
[491]229    // Finalize
[1460]230    if (CServer::serverLevel == 0)
231    {
232      if (CServer::getRank()==0)
233      {
234        info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;
235        globalRegistry->toFile("xios_registry.bin") ;
236        delete globalRegistry ;
237      }
238    }
239    else
240    {
241      // If using two server levels:
242      // (1) merge registries on each pool
243      // (2) send merged registries to the first pool
244      // (3) merge received registries on the first pool
245      if (CServer::serverLevel == 2)
246      {
247        vector<int>& secondaryServerGlobalRanks = CServer::getSecondaryServerGlobalRanks();
248        int firstPoolGlobalRank = secondaryServerGlobalRanks[0];
249        int rankGlobal;
250        MPI_Comm_rank(globalComm, &rankGlobal);
251
252        // Merge registries defined on each pools
253        CRegistry globalRegistrySndServers (CServer::intraComm);
254
255        // All pools (except the first): send globalRegistry to the first pool
256        for (int i=1; i<secondaryServerGlobalRanks.size(); i++)
257        {
258          if (rankGlobal == secondaryServerGlobalRanks[i])
259          {
260            globalRegistrySndServers.mergeRegistry(*globalRegistry) ;
261            int registrySize = globalRegistrySndServers.size();
262            MPI_Send(&registrySize,1,MPI_LONG,firstPoolGlobalRank,15,CXios::globalComm) ;
263            CBufferOut buffer(registrySize) ;
264            globalRegistrySndServers.toBuffer(buffer) ;
265            MPI_Send(buffer.start(),registrySize,MPI_CHAR,firstPoolGlobalRank,15,CXios::globalComm) ;
266          }
267        }
268
269        // First pool: receive globalRegistry of all secondary server pools, merge and write the resultant registry
270        if (rankGlobal == firstPoolGlobalRank)
271        {
272          MPI_Status status;
273          char* recvBuff;
274
275          globalRegistrySndServers.mergeRegistry(*globalRegistry) ;
276
277          for (int i=1; i< secondaryServerGlobalRanks.size(); i++)
278          {
279            int rank = secondaryServerGlobalRanks[i];
280            int registrySize = 0;
281            MPI_Recv(&registrySize, 1, MPI_LONG, rank, 15, CXios::globalComm, &status);
282            recvBuff = new char[registrySize];
283            MPI_Recv(recvBuff, registrySize, MPI_CHAR, rank, 15, CXios::globalComm, &status);
284            CBufferIn buffer(recvBuff, registrySize) ;
285            CRegistry recvRegistry;
286            recvRegistry.fromBuffer(buffer) ;
287            globalRegistrySndServers.mergeRegistry(recvRegistry) ;
288            delete[] recvBuff;
289          }
290
291          info(80)<<"Write data base Registry"<<endl<<globalRegistrySndServers.toString()<<endl ;
292          globalRegistrySndServers.toFile("xios_registry.bin") ;
293
294        }
295      }
296      delete globalRegistry;
297    }
[1205]298    CServer::finalize();
[1134]299
[1205]300#ifdef XIOS_MEMTRACK
301
302#ifdef XIOS_MEMTRACK_LIGHT
303       report(10) << " Memory report : current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
304       report(10) << " Memory report : maximum memory used by XIOS : "<<  MemTrack::getMaxMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
305#endif
306
307#ifdef XIOS_MEMTRACK_FULL
308     MemTrack::TrackListMemoryUsage() ;
309     MemTrack::TrackDumpBlocks();
310#endif
311#endif
[490]312    CServer::closeInfoStream();
313  }
314
[512]315  //! Parse configuration file
[346]316  void CXios::parseFile(const string& filename)
317  {
[490]318    xml::CXMLParser::ParseFile(filename);
[346]319  }
[491]320
[512]321  //! Set using server
[491]322  void CXios::setUsingServer()
323  {
324    usingServer = true;
325  }
326
[512]327  //! Unset using server
[491]328  void CXios::setNotUsingServer()
329  {
330    usingServer = false;
331  }
[300]332}
Note: See TracBrowser for help on using the repository browser.