source: XIOS/dev/branch_openmp/src/cxios.cpp @ 1533

Last change on this file since 1533 was 1533, checked in by yushan, 6 years ago

save dev

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 10.2 KB
Line 
1
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "client.hpp"
5#include "server.hpp"
6#include "xml_parser.hpp"
7#include <boost/functional/hash.hpp>
8#include "mpi.hpp"
9#include "memory.hpp"
10#include <new>
11#include "memtrack.hpp"
12#include "registry.hpp"
13using namespace ep_lib;
14
15namespace xios
16{
17  const string CXios::rootFile="./iodef.xml" ;
18  const string CXios::xiosCodeId="xios.x" ;
19  const string CXios::clientFile="./xios_client";
20  const string CXios::serverFile="./xios_server";
21  const string CXios::serverPrmFile="./xios_server1";
22  const string CXios::serverSndFile="./xios_server2";
23
24  bool CXios::isClient ;
25  bool CXios::isServer ;
26  MPI_Comm CXios::globalComm ;
27  bool CXios::usingOasis ;
28  bool CXios::usingServer = false;
29  bool CXios::usingServer2 = false;
30  int CXios::ratioServer2 = 50;
31  int CXios::nbPoolsServer2 = 1;
32  double CXios::bufferSizeFactor = 1.0;
33  const double CXios::defaultBufferSizeFactor = 1.0;
34  StdSize CXios::minBufferSize = 1024 * sizeof(double);
35  StdSize CXios::maxBufferSize = std::numeric_limits<int>::max() ;
36  bool CXios::printLogs2Files;
37  bool CXios::isOptPerformance = true;
38  CRegistry* CXios::globalRegistry = 0;
39  double CXios::recvFieldTimeout = 300.0;
40  bool CXios::checkEventSync=false ;
41 
42  //! Parse configuration file and create some objects from it
43  void CXios::initialize()
44  {
45    set_new_handler(noMemory);
46    int tmp_rank;
47    MPI_Comm_rank(MPI_COMM_WORLD, &tmp_rank);
48    #pragma omp critical
49    {
50      std::cout<<"thread "<<tmp_rank<<"("<<omp_get_thread_num()<<")"<<" parsing rootfile"<<std::endl;
51      parseFile(rootFile);
52      std::cout<<"thread "<<tmp_rank<<"("<<omp_get_thread_num()<<")"<<" parsed rootfile"<<std::endl;
53    }
54    #pragma omp barrier
55    parseXiosConfig();
56  }
57
58  /*!
59  \brief Parse xios part of configuration file (.iodef.xml)
60   Both client and server need information returned from this function
61  */
62  void CXios::parseXiosConfig()
63  {
64    usingOasis=getin<bool>("using_oasis",false) ;
65    usingServer=getin<bool>("using_server",false) ;
66    usingServer2=getin<bool>("using_server2",false) ;
67    ratioServer2=getin<int>("ratio_server2",50);
68    nbPoolsServer2=getin<int>("number_pools_server2",1);
69    info.setLevel(getin<int>("info_level",0)) ;
70    report.setLevel(getin<int>("info_level",50));
71    printLogs2Files=getin<bool>("print_file",false);
72
73    StdString bufMemory("memory");
74    StdString bufPerformance("performance");
75    StdString bufOpt = getin<StdString>("optimal_buffer_size", bufPerformance);
76    std::transform(bufOpt.begin(), bufOpt.end(), bufOpt.begin(), ::tolower);
77    if (0 == bufOpt.compare(bufMemory)) isOptPerformance = false;
78    else if (0 != bufOpt.compare(bufPerformance))
79    {
80      ERROR("CXios::parseXiosConfig()", << "optimal_buffer_size must be memory or performance "<< endl );
81    }
82
83    bufferSizeFactor = getin<double>("buffer_size_factor", defaultBufferSizeFactor);
84    minBufferSize = getin<int>("min_buffer_size", 1024 * sizeof(double));
85    maxBufferSize = getin<int>("max_buffer_size", std::numeric_limits<int>::max());
86    recvFieldTimeout = getin<double>("recv_field_timeout", recvFieldTimeout);
87    if (recvFieldTimeout < 0.0)
88      ERROR("CXios::parseXiosConfig()", "recv_field_timeout cannot be negative.");
89
90    checkEventSync = getin<bool>("check_event_sync", checkEventSync);
91
92    //globalComm=MPI_COMM_WORLD ;
93    int num_ep;
94    if(isClient) 
95    { 
96      num_ep = omp_get_num_threads();
97    }
98       
99    if(isServer) 
100    { 
101      num_ep = 1;
102    }
103       
104    MPI_Info info;
105    #pragma omp master
106    {
107      MPI_Comm *ep_comm;
108      MPI_Comm_create_endpoints(MPI_COMM_WORLD->mpi_comm, num_ep, info, ep_comm);  // servers should reach here too.
109      passage = ep_comm; 
110      EP_group_init();
111      //::MPI_Comm_group(to_mpi_comm(MPI_COMM_WORLD->mpi_comm), &MPI_GROUP_WORLD);
112    }
113       
114    #pragma omp barrier
115   
116         
117    CXios::globalComm = passage[omp_get_thread_num()];
118  }
119
120  /*!
121  Initialize client
122  \param [in] codeId identity of context
123  \param [in] localComm local communicator
124  \param [in/out] returnComm communicator corresponding to group of client with same codeId
125  */
126  void CXios::initClientSide(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)
127  {
128    isClient = true;
129
130    initialize() ;
131
132    CClient::initialize(codeId,localComm,returnComm) ;
133    if (CClient::getRank()==0) globalRegistry = new CRegistry(returnComm) ;
134
135    // If there are no server processes then we are in attached mode
136    // and the clients are also servers
137    isServer = !usingServer;
138
139    if (printLogs2Files)
140    {
141      #pragma omp critical
142      CClient::openInfoStream(clientFile);
143      CClient::openErrorStream(clientFile);
144    }
145    else
146    {
147      CClient::openInfoStream();
148      CClient::openErrorStream();
149    }
150  }
151
152  void CXios::clientFinalize(void)
153  {
154     CClient::finalize() ;
155     if (CClient::getRank()==0)
156     {
157       #pragma omp critical (_output)
158       info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;
159       globalRegistry->toFile("xios_registry.bin") ;
160       delete globalRegistry ;
161     }
162
163#ifdef XIOS_MEMTRACK
164
165#ifdef XIOS_MEMTRACK_LIGHT
166       report(10) << " Memory report : current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
167       report(10) << " Memory report : maximum memory used by XIOS : "<<  MemTrack::getMaxMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
168#endif
169
170#ifdef XIOS_MEMTRACK_FULL
171     MemTrack::TrackListMemoryUsage() ;
172     MemTrack::TrackDumpBlocks();
173#endif
174
175     CClient::closeInfoStream();
176
177#endif
178  }
179
180  //! Init server by parsing only xios part of config file
181  void CXios::initServer()
182  {
183    set_new_handler(noMemory);
184    std::set<StdString> parseList;
185    parseList.insert("xios");
186    xml::CXMLParser::ParseFile(rootFile, parseList);
187    parseXiosConfig();
188  }
189
190  //! Initialize server then put it into listening state
191  void CXios::initServerSide(void)
192  {
193   
194    isClient = false;
195    isServer = true;
196
197    initServer();
198
199    // Initialize all aspects MPI
200    CServer::initialize();
201    if (CServer::getRank()==0 && CServer::serverLevel != 1) globalRegistry = new CRegistry(CServer::intraComm) ;
202   
203    if (printLogs2Files)
204    {
205      if (CServer::serverLevel == 0)
206      {
207        CServer::openInfoStream(serverFile);
208        CServer::openErrorStream(serverFile);
209      }
210      else if (CServer::serverLevel == 1)
211      {
212        CServer::openInfoStream(serverPrmFile);
213        CServer::openErrorStream(serverPrmFile);
214      }
215      else
216      {
217        CServer::openInfoStream(serverSndFile);
218        CServer::openErrorStream(serverSndFile);
219      }
220    }
221    else
222    {
223      CServer::openInfoStream();
224      CServer::openErrorStream();
225    }
226
227    // Enter the loop to listen message from Client
228    CServer::eventLoop();
229
230    // Finalize
231    if (CServer::serverLevel == 0)
232    {
233      if (CServer::getRank()==0)
234      {
235        info(80)<<"Write data base Registry"<<endl<<globalRegistry->toString()<<endl ;
236        globalRegistry->toFile("xios_registry.bin") ;
237        delete globalRegistry ;
238      }
239    }
240    else
241    {
242      // If using two server levels:
243      // (1) merge registries on each pool
244      // (2) send merged registries to the first pool
245      // (3) merge received registries on the first pool
246      if (CServer::serverLevel == 2)
247      {
248        vector<int>& secondaryServerGlobalRanks = CServer::getSecondaryServerGlobalRanks();
249        int firstPoolGlobalRank = secondaryServerGlobalRanks[0];
250        int rankGlobal;
251        MPI_Comm_rank(globalComm, &rankGlobal);
252
253        // Merge registries defined on each pools
254        CRegistry globalRegistrySndServers (CServer::intraComm);
255
256        // All pools (except the first): send globalRegistry to the first pool
257        for (int i=1; i<secondaryServerGlobalRanks.size(); i++)
258        {
259          if (rankGlobal == secondaryServerGlobalRanks[i])
260          {
261            globalRegistrySndServers.mergeRegistry(*globalRegistry) ;
262            int registrySize = globalRegistrySndServers.size();
263            MPI_Send(&registrySize,1,MPI_LONG,firstPoolGlobalRank,15,CXios::globalComm) ;
264            CBufferOut buffer(registrySize) ;
265            globalRegistrySndServers.toBuffer(buffer) ;
266            MPI_Send(buffer.start(),registrySize,MPI_CHAR,firstPoolGlobalRank,15,CXios::globalComm) ;
267          }
268        }
269
270        // First pool: receive globalRegistry of all secondary server pools, merge and write the resultant registry
271        if (rankGlobal == firstPoolGlobalRank)
272        {
273          MPI_Status status;
274          char* recvBuff;
275
276          globalRegistrySndServers.mergeRegistry(*globalRegistry) ;
277
278          for (int i=1; i< secondaryServerGlobalRanks.size(); i++)
279          {
280            int rank = secondaryServerGlobalRanks[i];
281            int registrySize = 0;
282            MPI_Recv(&registrySize, 1, MPI_LONG, rank, 15, CXios::globalComm, &status);
283            recvBuff = new char[registrySize];
284            MPI_Recv(recvBuff, registrySize, MPI_CHAR, rank, 15, CXios::globalComm, &status);
285            CBufferIn buffer(recvBuff, registrySize) ;
286            CRegistry recvRegistry;
287            recvRegistry.fromBuffer(buffer) ;
288            globalRegistrySndServers.mergeRegistry(recvRegistry) ;
289            delete[] recvBuff;
290          }
291
292          info(80)<<"Write data base Registry"<<endl<<globalRegistrySndServers.toString()<<endl ;
293          globalRegistrySndServers.toFile("xios_registry.bin") ;
294
295        }
296      }
297      delete globalRegistry;
298    }
299    CServer::finalize();
300
301#ifdef XIOS_MEMTRACK
302
303#ifdef XIOS_MEMTRACK_LIGHT
304       report(10) << " Memory report : current memory used by XIOS : "<<  MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
305       report(10) << " Memory report : maximum memory used by XIOS : "<<  MemTrack::getMaxMemorySize()*1.0/(1024*1024)<<" Mbyte" << endl ;
306#endif
307
308#ifdef XIOS_MEMTRACK_FULL
309     MemTrack::TrackListMemoryUsage() ;
310     MemTrack::TrackDumpBlocks();
311#endif
312#endif
313    CServer::closeInfoStream();
314  }
315
316  //! Parse configuration file
317  void CXios::parseFile(const string& filename)
318  {
319    xml::CXMLParser::ParseFile(filename);
320  }
321
322  //! Set using server
323  void CXios::setUsingServer()
324  {
325    usingServer = true;
326  }
327
328  //! Unset using server
329  void CXios::setNotUsingServer()
330  {
331    usingServer = false;
332  }
333}
Note: See TracBrowser for help on using the repository browser.