source: XIOS/dev/dev_trunk_omp/src/cxios.cpp @ 1646

Last change on this file since 1646 was 1646, checked in by yushan, 5 years ago

branch merged with trunk @1645. arch file (ep&mpi) added for ADA

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 10.5 KB
Line 
1
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "client.hpp"
5#include "server.hpp"
6#include "xml_parser.hpp"
7#include <boost/functional/hash.hpp>
8#include "mpi.hpp"
9#include "memory.hpp"
10#include <new>
11#include "memtrack.hpp"
12#include "registry.hpp"
13#include "timer.hpp"
14#ifdef _usingEP
15using namespace ep_lib;
16#endif
17
18namespace xios
19{
20  const string CXios::rootFile="./iodef.xml" ;
21  const string CXios::xiosCodeId="xios.x" ;
22  const string CXios::clientFile="./xios_client";
23  const string CXios::serverFile="./xios_server";
24  const string CXios::serverPrmFile="./xios_server1";
25  const string CXios::serverSndFile="./xios_server2";
26
27  bool CXios::xiosStack = true;
28  bool CXios::systemStack = false;
29
30  bool CXios::isClient ;
31  bool CXios::isServer ;
32  MPI_Comm CXios::globalComm ;
33  bool CXios::usingOasis ;
34  bool CXios::usingServer = false;
35  bool CXios::usingServer2 = false;
36  int CXios::ratioServer2 = 50;
37  int CXios::nbPoolsServer2 = 1;
38  double CXios::bufferSizeFactor = 1.0;
39  const double CXios::defaultBufferSizeFactor = 1.0;
40  StdSize CXios::minBufferSize = 1024 * sizeof(double);
41  StdSize CXios::maxBufferSize = std::numeric_limits<int>::max() ;
42  bool CXios::printLogs2Files;
43  bool CXios::isOptPerformance = true;
44  CRegistry* CXios::globalRegistry = 0;
45  double CXios::recvFieldTimeout = 300.0;
46  bool CXios::checkEventSync=false ;
47 
48  //! Parse configuration file and create some objects from it
49  void CXios::initialize()
50  {
51    set_new_handler(noMemory);
52    int tmp_rank;
53    MPI_Comm_rank(MPI_COMM_WORLD, &tmp_rank);
54    #pragma omp critical
55    {
56      //std::cout<<"thread "<<tmp_rank<<"("<<omp_get_thread_num()<<")"<<" parsing rootfile"<<std::endl;
57      parseFile(rootFile);
58      std::cout<<"thread "<<tmp_rank<<"("<<omp_get_thread_num()<<")"<<" parsed rootfile"<<std::endl;
59    }
60    #pragma omp barrier
61    parseXiosConfig();
62  }
63
64  /*!
65  \brief Parse xios part of configuration file (.iodef.xml)
66   Both client and server need information returned from this function
67  */
68  void CXios::parseXiosConfig()
69  {
70    usingOasis=getin<bool>("using_oasis",false) ;
71    usingServer=getin<bool>("using_server",false) ;
72    usingServer2=getin<bool>("using_server2",false) ;
73    ratioServer2=getin<int>("ratio_server2",50);
74    nbPoolsServer2=getin<int>("number_pools_server2",0);
75    info.setLevel(getin<int>("info_level",0)) ;
76    report.setLevel(getin<int>("info_level",50));
77    printLogs2Files=getin<bool>("print_file",false);
78
79    xiosStack=getin<bool>("xios_stack",true) ;
80    systemStack=getin<bool>("system_stack",false) ;
81    if (xiosStack && systemStack)
82    {
83      xiosStack = false;
84    }
85
86    StdString bufMemory("memory");
87    StdString bufPerformance("performance");
88    StdString bufOpt = getin<StdString>("optimal_buffer_size", bufPerformance);
89    std::transform(bufOpt.begin(), bufOpt.end(), bufOpt.begin(), ::tolower);
90    if (0 == bufOpt.compare(bufMemory)) isOptPerformance = false;
91    else if (0 != bufOpt.compare(bufPerformance))
92    {
93      ERROR("CXios::parseXiosConfig()", << "optimal_buffer_size must be memory or performance "<< endl );
94    }
95
96    bufferSizeFactor = getin<double>("buffer_size_factor", defaultBufferSizeFactor);
97    minBufferSize = getin<int>("min_buffer_size", 1024 * sizeof(double));
98    maxBufferSize = getin<int>("max_buffer_size", std::numeric_limits<int>::max());
99    recvFieldTimeout = getin<double>("recv_field_timeout", recvFieldTimeout);
100    if (recvFieldTimeout < 0.0)
101      ERROR("CXios::parseXiosConfig()", "recv_field_timeout cannot be negative.");
102
103    checkEventSync = getin<bool>("check_event_sync", checkEventSync);
104
105    #ifdef _usingMPI
106    globalComm=MPI_COMM_WORLD ;
107    #elif _usingEP
108    int num_ep;
109
110    if(isClient)  num_ep = omp_get_num_threads();
111    if(isServer) num_ep = 1;
112       
113    MPI_Info info;
114    #pragma omp master
115    {
116      MPI_Comm *ep_comm;
117      MPI_Comm_create_endpoints(MPI_COMM_WORLD->mpi_comm, num_ep, info, ep_comm);  // servers should reach here too.
118      passage = ep_comm; 
119    }
120       
121    #pragma omp barrier
122    CXios::globalComm = passage[omp_get_thread_num()];
123    #endif
124  }
125
126  /*!
127  Initialize client
128  \param [in] codeId identity of context
129  \param [in] localComm local communicator
130  \param [in/out] returnComm communicator corresponding to group of client with same codeId
131  */
132  void CXios::initClientSide(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)
133  TRY
134  {
135    isClient = true;
136    isServer = false;
137
138    initialize() ;
139
140    CClient::initialize(codeId,localComm,returnComm) ;
141    if (CClient::getRank()==0) globalRegistry = new CRegistry(returnComm) ;
142
143    // If there are no server processes then we are in attached mode
144    // and the clients are also servers
145    isServer = !usingServer;
146
147    if (printLogs2Files)
148    {
149      #pragma omp critical
150      CClient::openInfoStream(clientFile);
151      CClient::openErrorStream(clientFile);
152    }
153    else
154    {
155      CClient::openInfoStream();
156      CClient::openErrorStream();
157    }
158  }
159  CATCH
160
161  void CXios::clientFinalize(void)
162  {
163     CClient::finalize() ;
164     if (CClient::getRank()==0)
165     {
166       #pragma omp critical (_output)
167       {
168        info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;
169       }
170       globalRegistry->toFile("xios_registry.bin") ;
171       delete globalRegistry ;
172     }
173
174#ifdef XIOS_MEMTRACK
175
176#ifdef XIOS_MEMTRACK_LIGHT
177       report(10) << " Memory report : current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
178       report(10) << " Memory report : maximum memory used by XIOS : "<<  MemTrack::getMaxMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
179#endif
180
181#ifdef XIOS_MEMTRACK_FULL
182     MemTrack::TrackListMemoryUsage() ;
183     MemTrack::TrackDumpBlocks();
184#endif
185
186     CClient::closeInfoStream();
187
188#endif
189  }
190
191  //! Init server by parsing only xios part of config file
192  void CXios::initServer()
193  {
194    set_new_handler(noMemory);
195    std::set<StdString> parseList;
196    parseList.insert("xios");
197    xml::CXMLParser::ParseFile(rootFile, parseList);
198    parseXiosConfig();
199  }
200
201  //! Initialize server then put it into listening state
202  void CXios::initServerSide(void)
203  {
204
205    isClient = false;
206    isServer = true;
207
208    initServer();
209
210    // Initialize all aspects MPI
211    CServer::initialize();
212    if (CServer::getRank()==0 && CServer::serverLevel != 1) globalRegistry = new CRegistry(CServer::intraComm) ;
213   
214    if (printLogs2Files)
215    {
216      if (CServer::serverLevel == 0)
217      {
218        CServer::openInfoStream(serverFile);
219        CServer::openErrorStream(serverFile);
220      }
221      else if (CServer::serverLevel == 1)
222      {
223        CServer::openInfoStream(serverPrmFile);
224        CServer::openErrorStream(serverPrmFile);
225      }
226      else
227      {
228        CServer::openInfoStream(serverSndFile);
229        CServer::openErrorStream(serverSndFile);
230      }
231    }
232    else
233    {
234      CServer::openInfoStream();
235      CServer::openErrorStream();
236    }
237
238    // Enter the loop to listen message from Client
239    CServer::eventLoop();
240
241    // Finalize
242    if (CServer::serverLevel == 0)
243    {
244      if (CServer::getRank()==0)
245      {
246        #pragma omp critical (_output)
247        {
248          info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;
249        }
250        globalRegistry->toFile("xios_registry.bin") ;
251        delete globalRegistry ;
252      }
253    }
254    else
255    {
256      // If using two server levels:
257      // (1) merge registries on each pool
258      // (2) send merged registries to the first pool
259      // (3) merge received registries on the first pool
260      if (CServer::serverLevel == 2)
261      {
262        vector<int>& secondaryServerGlobalRanks = CServer::getSecondaryServerGlobalRanks();
263        int firstPoolGlobalRank = secondaryServerGlobalRanks[0];
264        int rankGlobal;
265        MPI_Comm_rank(globalComm, &rankGlobal);
266
267        // Merge registries defined on each pools
268        CRegistry globalRegistrySndServers (CServer::intraComm);
269
270        // All pools (except the first): send globalRegistry to the first pool
271        for (int i=1; i<secondaryServerGlobalRanks.size(); i++)
272        {
273          if (rankGlobal == secondaryServerGlobalRanks[i])
274          {
275            globalRegistrySndServers.mergeRegistry(*globalRegistry) ;
276            int registrySize = globalRegistrySndServers.size();
277            MPI_Send(&registrySize,1,MPI_LONG,firstPoolGlobalRank,15,CXios::globalComm) ;
278            CBufferOut buffer(registrySize) ;
279            globalRegistrySndServers.toBuffer(buffer) ;
280            MPI_Send(buffer.start(),registrySize,MPI_CHAR,firstPoolGlobalRank,15,CXios::globalComm) ;
281          }
282        }
283
284        // First pool: receive globalRegistry of all secondary server pools, merge and write the resultant registry
285        if (rankGlobal == firstPoolGlobalRank)
286        {
287          MPI_Status status;
288          char* recvBuff;
289
290          globalRegistrySndServers.mergeRegistry(*globalRegistry) ;
291
292          for (int i=1; i< secondaryServerGlobalRanks.size(); i++)
293          {
294            int rank = secondaryServerGlobalRanks[i];
295            int registrySize = 0;
296            MPI_Recv(&registrySize, 1, MPI_LONG, rank, 15, CXios::globalComm, &status);
297            recvBuff = new char[registrySize];
298            MPI_Recv(recvBuff, registrySize, MPI_CHAR, rank, 15, CXios::globalComm, &status);
299            CBufferIn buffer(recvBuff, registrySize) ;
300            CRegistry recvRegistry;
301            recvRegistry.fromBuffer(buffer) ;
302            globalRegistrySndServers.mergeRegistry(recvRegistry) ;
303            delete[] recvBuff;
304          }
305
306          #pragma omp critical (_output)
307          {
308            info(80)<<"Write data base Registry"<<endl<<globalRegistrySndServers.toString()<<endl ;
309          }
310          globalRegistrySndServers.toFile("xios_registry.bin") ;
311
312        }
313      }
314      delete globalRegistry;
315    }
316    CTimer::get("XIOS").suspend() ;
317    CServer::finalize();
318
319#ifdef XIOS_MEMTRACK
320
321#ifdef XIOS_MEMTRACK_LIGHT
322       report(10) << " Memory report : current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
323       report(10) << " Memory report : maximum memory used by XIOS : "<<  MemTrack::getMaxMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
324#endif
325
326#ifdef XIOS_MEMTRACK_FULL
327     MemTrack::TrackListMemoryUsage() ;
328     MemTrack::TrackDumpBlocks();
329#endif
330#endif
331    CServer::closeInfoStream();
332  }
333
334  //! Parse configuration file
335  void CXios::parseFile(const string& filename)
336  {
337    xml::CXMLParser::ParseFile(filename);
338  }
339
340  //! Set using server
341  void CXios::setUsingServer()
342  {
343    usingServer = true;
344  }
345
346  //! Unset using server
347  void CXios::setNotUsingServer()
348  {
349    usingServer = false;
350  }
351}
Note: See TracBrowser for help on using the repository browser.