source: XIOS/dev/dev_ym/XIOS_COUPLING/src/server.cpp @ 2265

Last change on this file since 2265 was 2265, checked in by ymipsl, 6 months ago

tracking memory leak : remove context and all related object from object factory when context is finalized.
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 13.9 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "event_scheduler.hpp"
16#include "string_tools.hpp"
17#include "ressources_manager.hpp"
18#include "services_manager.hpp"
19#include "contexts_manager.hpp"
20#include "servers_ressource.hpp"
21#include <cstdio>
22#include "workflow_graph.hpp"
23
24
25namespace xios
26{
27    MPI_Comm CServer::intraComm ;
28    MPI_Comm CServer::serversComm_ ;
29    std::list<MPI_Comm> CServer::interCommLeft ;
30    std::list<MPI_Comm> CServer::interCommRight ;
31    std::list<MPI_Comm> CServer::contextInterComms;
32    std::list<MPI_Comm> CServer::contextIntraComms;
33    int CServer::serverLevel = 0 ;
34    int CServer::nbContexts = 0;
35    bool CServer::isRoot = false ;
36    int CServer::rank_ = INVALID_RANK;
37    StdOFStream CServer::m_infoStream;
38    StdOFStream CServer::m_errorStream;
39    map<string,CContext*> CServer::contextList ;
40    vector<int> CServer::sndServerGlobalRanks;
41    bool CServer::finished=false ;
42    bool CServer::is_MPI_Initialized ;
43    CEventScheduler* CServer::eventScheduler = 0;
44    CServersRessource* CServer::serversRessource_=nullptr ;
45
46       
47    void CServer::initialize(void)
48    {
49     
50      MPI_Comm serverComm ;
51      int initialized ;
52      MPI_Initialized(&initialized) ;
53      if (initialized) is_MPI_Initialized=true ;
54      else is_MPI_Initialized=false ;
55      MPI_Comm globalComm=CXios::getGlobalComm() ;
56
57      /////////////////////////////////////////
58      ///////////// PART 1 ////////////////////
59      /////////////////////////////////////////
60      CTimer::get("XIOS").resume() ;
61      CTimer::get("XIOS initialize").resume() ;
62      // don't use OASIS
63      if (!CXios::usingOasis)
64      {
65        if (!is_MPI_Initialized) MPI_Init(NULL, NULL);
66       
67        // split the global communicator
68        // get hash from all model to attribute a unique color (int) and then split to get client communicator
69        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
70         
71        int commRank, commSize ;
72        MPI_Comm_rank(globalComm,&commRank) ;
73        MPI_Comm_size(globalComm,&commSize) ;
74
75        std::hash<string> hashString ;
76        size_t hashServer=hashString(CXios::xiosCodeId) ;
77         
78        size_t* hashAll = new size_t[commSize] ;
79        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
80         
81        int color=0 ;
82        map<size_t,int> listHash ;
83        for(int i=0 ; i<=commSize ; i++) 
84          if (listHash.count(hashAll[i])==0) 
85          {
86            listHash[hashAll[i]]=color ;
87            color=color+1 ;
88          }
89        color=listHash[hashServer] ;
90        delete[] hashAll ;
91
92        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
93      }
94      else // using OASIS
95      {
96        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
97
98        oasis_get_localcomm(serverComm);
99      }
100 
101      /////////////////////////////////////////
102      ///////////// PART 2 ////////////////////
103      /////////////////////////////////////////
104     
105
106      // Create the XIOS communicator for every process which is related
107      // to XIOS, as well on client side as on server side
108      MPI_Comm xiosGlobalComm ;
109      string strIds=CXios::getin<string>("clients_code_id","") ;
110      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
111      if (strIds.empty())
112      {
113        // no code Ids given, suppose XIOS initialisation is global           
114        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
115        MPI_Comm splitComm,interComm ;
116        MPI_Comm_rank(globalComm,&commGlobalRank) ;
117        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
118        MPI_Comm_rank(splitComm,&commRank) ;
119        if (commRank==0) serverLeader=commGlobalRank ;
120        else serverLeader=0 ;
121        clientLeader=0 ;
122        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
123        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
124        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
125        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
126        CXios::setXiosComm(xiosGlobalComm) ;
127      }
128      else
129      {
130
131        xiosGlobalCommByFileExchange(serverComm) ;
132
133      }
134     
135      /////////////////////////////////////////
136      ///////////// PART 4 ////////////////////
137      //  create servers intra communicator  //
138      /////////////////////////////////////////
139     
140      int commRank ;
141      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
142      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
143     
144      CXios::setUsingServer() ;
145
146      /////////////////////////////////////////
147      ///////////// PART 5 ////////////////////
148      //       redirect files output         //
149      /////////////////////////////////////////
150     
151      CServer::openInfoStream(CXios::serverFile);
152      CServer::openErrorStream(CXios::serverFile);
153
154      /////////////////////////////////////////
155      ///////////// PART 4 ////////////////////
156      /////////////////////////////////////////
157
158      CXios::launchDaemonsManager(true) ;
159     
160      /////////////////////////////////////////
161      ///////////// PART 5 ////////////////////
162      /////////////////////////////////////////
163
164      // create the services
165
166      auto ressourcesManager=CXios::getRessourcesManager() ;
167      auto servicesManager=CXios::getServicesManager() ;
168      auto contextsManager=CXios::getContextsManager() ;
169      auto daemonsManager=CXios::getDaemonsManager() ;
170      auto serversRessource=CServer::getServersRessource() ;
171
172      if (serversRessource->isServerLeader())
173      {
174        int nbRessources = ressourcesManager->getRessourcesSize() ;
175        if (!CXios::usingServer2)
176        {
177          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
178          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ;
179        }
180        else
181        {
182          int nprocsServer = nbRessources*CXios::ratioServer2/100.;
183          int nprocsGatherer = nbRessources - nprocsServer ;
184         
185          int nbPoolsServer2 = CXios::nbPoolsServer2 ;
186          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
187          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
188          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
189          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ;
190        }
191      }
192      CTimer::get("XIOS initialize").suspend() ;
193
194      /////////////////////////////////////////
195      ///////////// PART 5 ////////////////////
196      /////////////////////////////////////////
197      // loop on event loop
198
199      bool finished=false ;
200      CTimer::get("XIOS event loop").resume() ;
201
202      while (!finished)
203      {
204        finished=daemonsManager->eventLoop() ;
205      }
206      CTimer::get("XIOS event loop").suspend() ;
207
208      // Delete CContext
209      CObjectTemplate<CContext>::cleanStaticDataStructure();
210    }
211
212
213
214
215
216    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
217    {
218       
219      MPI_Comm globalComm=CXios::getGlobalComm() ;
220      MPI_Comm xiosGlobalComm ;
221     
222      string strIds=CXios::getin<string>("clients_code_id","") ;
223      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
224     
225      int commRank, globalRank ;
226      MPI_Comm_rank(serverComm, &commRank) ;
227      MPI_Comm_rank(globalComm, &globalRank) ;
228      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
229
230      if (commRank==0) // if root process publish name
231      { 
232        std::ofstream ofs (serverFileName, std::ofstream::out);
233        ofs<<globalRank ;
234        ofs.close();
235      }
236       
237      vector<int> clientsRank(clientsCodeId.size()) ;
238      for(int i=0;i<clientsRank.size();i++)
239      {
240        std::ifstream ifs ;
241        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
242        do
243        {
244          ifs.clear() ;
245          ifs.open(fileName, std::ifstream::in) ;
246        } while (ifs.fail()) ;
247        ifs>>clientsRank[i] ;
248        ifs.close() ; 
249      }
250
251      MPI_Comm intraComm ;
252      MPI_Comm_dup(serverComm,&intraComm) ;
253      MPI_Comm interComm ;
254      for(int i=0 ; i<clientsRank.size(); i++)
255      { 
256        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
257        MPI_Comm_free(&intraComm) ;
258        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
259      }
260      xiosGlobalComm=intraComm ; 
261      MPI_Barrier(xiosGlobalComm);
262      if (commRank==0) std::remove(serverFileName.c_str()) ;
263      MPI_Barrier(xiosGlobalComm);
264
265      CXios::setXiosComm(xiosGlobalComm) ;
266     
267    }
268
269
270    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
271    {
272        // untested, need to be tested on a true MPI-2 compliant library
273
274        // try to discover other client/server
275/*
276        // publish server name
277        char portName[MPI_MAX_PORT_NAME];
278        int ierr ;
279        int commRank ;
280        MPI_Comm_rank(serverComm, &commRank) ;
281       
282        if (commRank==0) // if root process publish name
283        { 
284          MPI_Open_port(MPI_INFO_NULL, portName);
285          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
286        }
287
288        MPI_Comm intraComm=serverComm ;
289        MPI_Comm interComm ;
290        for(int i=0 ; i<clientsCodeId.size(); i++)
291        { 
292          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
293          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
294        }
295*/     
296    }
297
298
299    void CServer::finalize(void)
300    {
301      CTimer::get("XIOS").suspend() ;
302     
303      delete eventScheduler ;
304
305      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
306        MPI_Comm_free(&(*it));
307
308      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
309        MPI_Comm_free(&(*it));
310
311        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
312          MPI_Comm_free(&(*it));
313
314//      MPI_Comm_free(&intraComm);
315      CContext::removeAllContexts() ; // free memory for related context
316      CXios::finalizeDaemonsManager();
317     
318      if (!is_MPI_Initialized)
319      {
320        if (CXios::usingOasis) oasis_finalize();
321        else MPI_Finalize() ;
322      }
323      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
324      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
325      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
326      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
327
328      CWorkflowGraph::drawWorkFlowGraph_server();
329    }
330
331    /*!
332    * Open a file specified by a suffix and an extension and use it for the given file buffer.
333    * The file name will be suffix+rank+extension.
334    *
335    * \param fileName[in] protype file name
336    * \param ext [in] extension of the file
337    * \param fb [in/out] the file buffer
338    */
339    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
340    {
341      StdStringStream fileNameServer;
342      int numDigit = 0;
343      int commSize = 0;
344      int commRank ;
345      int id;
346     
347      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
348      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
349
350      while (commSize)
351      {
352        commSize /= 10;
353        ++numDigit;
354      }
355      id = commRank;
356
357      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
358      fb->open(fileNameServer.str().c_str(), std::ios::out);
359      if (!fb->is_open())
360        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
361              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
362    }
363
364    /*!
365    * \brief Open a file stream to write the info logs
366    * Open a file stream with a specific file name suffix+rank
367    * to write the info logs.
368    * \param fileName [in] protype file name
369    */
370    void CServer::openInfoStream(const StdString& fileName)
371    {
372      std::filebuf* fb = m_infoStream.rdbuf();
373      openStream(fileName, ".out", fb);
374
375      info.write2File(fb);
376      report.write2File(fb);
377    }
378
379    //! Write the info logs to standard output
380    void CServer::openInfoStream()
381    {
382      info.write2StdOut();
383      report.write2StdOut();
384    }
385
386    //! Close the info logs file if it opens
387    void CServer::closeInfoStream()
388    {
389      if (m_infoStream.is_open()) m_infoStream.close();
390    }
391
392    /*!
393    * \brief Open a file stream to write the error log
394    * Open a file stream with a specific file name suffix+rank
395    * to write the error log.
396    * \param fileName [in] protype file name
397    */
398    void CServer::openErrorStream(const StdString& fileName)
399    {
400      std::filebuf* fb = m_errorStream.rdbuf();
401      openStream(fileName, ".err", fb);
402
403      error.write2File(fb);
404    }
405
406    //! Write the error log to standard error output
407    void CServer::openErrorStream()
408    {
409      error.write2StdErr();
410    }
411
412    //! Close the error log file if it opens
413    void CServer::closeErrorStream()
414    {
415      if (m_errorStream.is_open()) m_errorStream.close();
416    }
417
418    void CServer::launchServersRessource(MPI_Comm serverComm)
419    {
420      serversRessource_ = new CServersRessource(serverComm) ;
421    }
422}
Note: See TracBrowser for help on using the repository browser.