source: XIOS/dev/dev_ym/XIOS_COUPLING/src/server.cpp @ 1878

Last change on this file since 1878 was 1765, checked in by ymipsl, 4 years ago

Some cleaning On XIOS services branch

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 13.4 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "event_scheduler.hpp"
16#include "string_tools.hpp"
17#include "ressources_manager.hpp"
18#include "services_manager.hpp"
19#include "contexts_manager.hpp"
20#include "servers_ressource.hpp"
21#include <cstdio>
22
23
24
25namespace xios
26{
27    MPI_Comm CServer::intraComm ;
28    MPI_Comm CServer::serversComm_ ;
29    std::list<MPI_Comm> CServer::interCommLeft ;
30    std::list<MPI_Comm> CServer::interCommRight ;
31    std::list<MPI_Comm> CServer::contextInterComms;
32    std::list<MPI_Comm> CServer::contextIntraComms;
33    int CServer::serverLevel = 0 ;
34    int CServer::nbContexts = 0;
35    bool CServer::isRoot = false ;
36    int CServer::rank_ = INVALID_RANK;
37    StdOFStream CServer::m_infoStream;
38    StdOFStream CServer::m_errorStream;
39    map<string,CContext*> CServer::contextList ;
40    vector<int> CServer::sndServerGlobalRanks;
41    bool CServer::finished=false ;
42    bool CServer::is_MPI_Initialized ;
43    CEventScheduler* CServer::eventScheduler = 0;
44    CServersRessource* CServer::serversRessource_=nullptr ;
45
46       
47    void CServer::initialize(void)
48    {
49     
50      MPI_Comm serverComm ;
51      int initialized ;
52      MPI_Initialized(&initialized) ;
53      if (initialized) is_MPI_Initialized=true ;
54      else is_MPI_Initialized=false ;
55      MPI_Comm globalComm=CXios::getGlobalComm() ;
56
57      /////////////////////////////////////////
58      ///////////// PART 1 ////////////////////
59      /////////////////////////////////////////
60
61      // don't use OASIS
62      if (!CXios::usingOasis)
63      {
64        if (!is_MPI_Initialized) MPI_Init(NULL, NULL);
65       
66        // split the global communicator
67        // get hash from all model to attribute a unique color (int) and then split to get client communicator
68        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
69         
70        int commRank, commSize ;
71        MPI_Comm_rank(globalComm,&commRank) ;
72        MPI_Comm_size(globalComm,&commSize) ;
73
74        std::hash<string> hashString ;
75        size_t hashServer=hashString(CXios::xiosCodeId) ;
76         
77        size_t* hashAll = new size_t[commSize] ;
78        MPI_Allgather(&hashServer,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ;
79         
80        int color=0 ;
81        set<size_t> listHash ;
82        for(int i=0 ; i<=commRank ; i++) 
83          if (listHash.count(hashAll[i])==1)
84          {
85            listHash.insert(hashAll[i]) ;
86            color=color+1 ;
87          }
88        delete[] hashAll ;
89
90        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
91      }
92      else // using OASIS
93      {
94        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
95
96        CTimer::get("XIOS").resume() ;
97        oasis_get_localcomm(serverComm);
98      }
99 
100      /////////////////////////////////////////
101      ///////////// PART 2 ////////////////////
102      /////////////////////////////////////////
103     
104
105      // Create the XIOS communicator for every process which is related
106      // to XIOS, as well on client side as on server side
107      MPI_Comm xiosGlobalComm ;
108      string strIds=CXios::getin<string>("clients_code_id","") ;
109      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
110      if (strIds.empty())
111      {
112        // no code Ids given, suppose XIOS initialisation is global           
113        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
114        MPI_Comm splitComm,interComm ;
115        MPI_Comm_rank(globalComm,&commGlobalRank) ;
116        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
117        MPI_Comm_rank(splitComm,&commRank) ;
118        if (commRank==0) serverLeader=commGlobalRank ;
119        else serverLeader=0 ;
120        clientLeader=0 ;
121        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
122        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
123        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
124        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
125        CXios::setXiosComm(xiosGlobalComm) ;
126      }
127      else
128      {
129
130        xiosGlobalCommByFileExchange(serverComm) ;
131
132      }
133     
134      /////////////////////////////////////////
135      ///////////// PART 4 ////////////////////
136      //  create servers intra communicator  //
137      /////////////////////////////////////////
138     
139      int commRank ;
140      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
141      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
142     
143      CXios::setUsingServer() ;
144
145      /////////////////////////////////////////
146      ///////////// PART 5 ////////////////////
147      //       redirect files output         //
148      /////////////////////////////////////////
149     
150      CServer::openInfoStream(CXios::serverFile);
151      CServer::openErrorStream(CXios::serverFile);
152
153      /////////////////////////////////////////
154      ///////////// PART 4 ////////////////////
155      /////////////////////////////////////////
156
157      CXios::launchDaemonsManager(true) ;
158     
159      /////////////////////////////////////////
160      ///////////// PART 5 ////////////////////
161      /////////////////////////////////////////
162
163      // create the services
164
165      auto ressourcesManager=CXios::getRessourcesManager() ;
166      auto servicesManager=CXios::getServicesManager() ;
167      auto contextsManager=CXios::getContextsManager() ;
168      auto daemonsManager=CXios::getDaemonsManager() ;
169      auto serversRessource=CServer::getServersRessource() ;
170
171      if (serversRessource->isServerLeader())
172      {
173        int nbRessources = ressourcesManager->getRessourcesSize() ;
174        if (!CXios::usingServer2)
175        {
176          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
177          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ;
178        }
179        else
180        {
181          int nprocsServer = nbRessources*CXios::ratioServer2/100.;
182          int nprocsGatherer = nbRessources - nprocsServer ;
183         
184          int nbPoolsServer2 = CXios::nbPoolsServer2 ;
185          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
186          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
187          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
188          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ;
189        }
190      }
191
192      /////////////////////////////////////////
193      ///////////// PART 5 ////////////////////
194      /////////////////////////////////////////
195      // loop on event loop
196
197      bool finished=false ;
198      while (!finished)
199      {
200        finished=daemonsManager->eventLoop() ;
201      }
202
203    }
204
205
206
207
208
209    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
210    {
211       
212      MPI_Comm globalComm=CXios::getGlobalComm() ;
213      MPI_Comm xiosGlobalComm ;
214     
215      string strIds=CXios::getin<string>("clients_code_id","") ;
216      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
217     
218      int commRank, globalRank ;
219      MPI_Comm_rank(serverComm, &commRank) ;
220      MPI_Comm_rank(globalComm, &globalRank) ;
221      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
222
223      if (commRank==0) // if root process publish name
224      { 
225        std::ofstream ofs (serverFileName, std::ofstream::out);
226        ofs<<globalRank ;
227        ofs.close();
228      }
229       
230      vector<int> clientsRank(clientsCodeId.size()) ;
231      for(int i=0;i<clientsRank.size();i++)
232      {
233        std::ifstream ifs ;
234        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
235        do
236        {
237          ifs.clear() ;
238          ifs.open(fileName, std::ifstream::in) ;
239        } while (ifs.fail()) ;
240        ifs>>clientsRank[i] ;
241        ifs.close() ; 
242      }
243
244      MPI_Comm intraComm ;
245      MPI_Comm_dup(serverComm,&intraComm) ;
246      MPI_Comm interComm ;
247      for(int i=0 ; i<clientsRank.size(); i++)
248      { 
249        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
250        MPI_Comm_free(&intraComm) ;
251        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
252      }
253      xiosGlobalComm=intraComm ; 
254      MPI_Barrier(xiosGlobalComm);
255      if (commRank==0) std::remove(serverFileName.c_str()) ;
256      MPI_Barrier(xiosGlobalComm);
257
258      CXios::setXiosComm(xiosGlobalComm) ;
259     
260    }
261
262
263    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
264    {
265        // untested, need to be tested on a true MPI-2 compliant library
266
267        // try to discover other client/server
268/*
269        // publish server name
270        char portName[MPI_MAX_PORT_NAME];
271        int ierr ;
272        int commRank ;
273        MPI_Comm_rank(serverComm, &commRank) ;
274       
275        if (commRank==0) // if root process publish name
276        { 
277          MPI_Open_port(MPI_INFO_NULL, portName);
278          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
279        }
280
281        MPI_Comm intraComm=serverComm ;
282        MPI_Comm interComm ;
283        for(int i=0 ; i<clientsCodeId.size(); i++)
284        { 
285          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
286          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
287        }
288*/     
289    }
290
291
292    void CServer::finalize(void)
293    {
294      CTimer::get("XIOS").suspend() ;
295     
296      delete eventScheduler ;
297
298      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
299        MPI_Comm_free(&(*it));
300
301      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
302        MPI_Comm_free(&(*it));
303
304        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
305          MPI_Comm_free(&(*it));
306
307//      MPI_Comm_free(&intraComm);
308
309      CXios::finalizeDaemonsManager();
310     
311      if (!is_MPI_Initialized)
312      {
313        if (CXios::usingOasis) oasis_finalize();
314        else MPI_Finalize() ;
315      }
316      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
317      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
318      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
319      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
320    }
321
322    /*!
323    * Open a file specified by a suffix and an extension and use it for the given file buffer.
324    * The file name will be suffix+rank+extension.
325    *
326    * \param fileName[in] protype file name
327    * \param ext [in] extension of the file
328    * \param fb [in/out] the file buffer
329    */
330    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
331    {
332      StdStringStream fileNameServer;
333      int numDigit = 0;
334      int commSize = 0;
335      int commRank ;
336      int id;
337     
338      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
339      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
340
341      while (commSize)
342      {
343        commSize /= 10;
344        ++numDigit;
345      }
346      id = commRank;
347
348      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
349      fb->open(fileNameServer.str().c_str(), std::ios::out);
350      if (!fb->is_open())
351        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
352              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
353    }
354
355    /*!
356    * \brief Open a file stream to write the info logs
357    * Open a file stream with a specific file name suffix+rank
358    * to write the info logs.
359    * \param fileName [in] protype file name
360    */
361    void CServer::openInfoStream(const StdString& fileName)
362    {
363      std::filebuf* fb = m_infoStream.rdbuf();
364      openStream(fileName, ".out", fb);
365
366      info.write2File(fb);
367      report.write2File(fb);
368    }
369
370    //! Write the info logs to standard output
371    void CServer::openInfoStream()
372    {
373      info.write2StdOut();
374      report.write2StdOut();
375    }
376
377    //! Close the info logs file if it opens
378    void CServer::closeInfoStream()
379    {
380      if (m_infoStream.is_open()) m_infoStream.close();
381    }
382
383    /*!
384    * \brief Open a file stream to write the error log
385    * Open a file stream with a specific file name suffix+rank
386    * to write the error log.
387    * \param fileName [in] protype file name
388    */
389    void CServer::openErrorStream(const StdString& fileName)
390    {
391      std::filebuf* fb = m_errorStream.rdbuf();
392      openStream(fileName, ".err", fb);
393
394      error.write2File(fb);
395    }
396
397    //! Write the error log to standard error output
398    void CServer::openErrorStream()
399    {
400      error.write2StdErr();
401    }
402
403    //! Close the error log file if it opens
404    void CServer::closeErrorStream()
405    {
406      if (m_errorStream.is_open()) m_errorStream.close();
407    }
408
409    void CServer::launchServersRessource(MPI_Comm serverComm)
410    {
411      serversRessource_ = new CServersRessource(serverComm) ;
412    }
413}
Note: See TracBrowser for help on using the repository browser.