source: XIOS/dev/dev_ym/XIOS_COUPLING/src/server.cpp @ 2242

Last change on this file since 2242 was 2242, checked in by ymipsl, 3 years ago

Bug fix in when split global communicator between clients and servers. Did'nt work with cyclic distribution of servers because it was assume than clients and servers partionning was contiguous (missing file in commit)
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 13.7 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "event_scheduler.hpp"
16#include "string_tools.hpp"
17#include "ressources_manager.hpp"
18#include "services_manager.hpp"
19#include "contexts_manager.hpp"
20#include "servers_ressource.hpp"
21#include <cstdio>
22#include "workflow_graph.hpp"
23
24
25namespace xios
26{
27    MPI_Comm CServer::intraComm ;
28    MPI_Comm CServer::serversComm_ ;
29    std::list<MPI_Comm> CServer::interCommLeft ;
30    std::list<MPI_Comm> CServer::interCommRight ;
31    std::list<MPI_Comm> CServer::contextInterComms;
32    std::list<MPI_Comm> CServer::contextIntraComms;
33    int CServer::serverLevel = 0 ;
34    int CServer::nbContexts = 0;
35    bool CServer::isRoot = false ;
36    int CServer::rank_ = INVALID_RANK;
37    StdOFStream CServer::m_infoStream;
38    StdOFStream CServer::m_errorStream;
39    map<string,CContext*> CServer::contextList ;
40    vector<int> CServer::sndServerGlobalRanks;
41    bool CServer::finished=false ;
42    bool CServer::is_MPI_Initialized ;
43    CEventScheduler* CServer::eventScheduler = 0;
44    CServersRessource* CServer::serversRessource_=nullptr ;
45
46       
47    void CServer::initialize(void)
48    {
49     
50      MPI_Comm serverComm ;
51      int initialized ;
52      MPI_Initialized(&initialized) ;
53      if (initialized) is_MPI_Initialized=true ;
54      else is_MPI_Initialized=false ;
55      MPI_Comm globalComm=CXios::getGlobalComm() ;
56
57      /////////////////////////////////////////
58      ///////////// PART 1 ////////////////////
59      /////////////////////////////////////////
60      CTimer::get("XIOS").resume() ;
61      CTimer::get("XIOS initialize").resume() ;
62      // don't use OASIS
63      if (!CXios::usingOasis)
64      {
65        if (!is_MPI_Initialized) MPI_Init(NULL, NULL);
66       
67        // split the global communicator
68        // get hash from all model to attribute a unique color (int) and then split to get client communicator
69        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
70         
71        int commRank, commSize ;
72        MPI_Comm_rank(globalComm,&commRank) ;
73        MPI_Comm_size(globalComm,&commSize) ;
74
75        std::hash<string> hashString ;
76        size_t hashServer=hashString(CXios::xiosCodeId) ;
77         
78        size_t* hashAll = new size_t[commSize] ;
79        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
80         
81        int color=0 ;
82        map<size_t,int> listHash ;
83        for(int i=0 ; i<=commSize ; i++) 
84          if (listHash.count(hashAll[i])==0) 
85          {
86            listHash[hashAll[i]]=color ;
87            color=color+1 ;
88          }
89        color=listHash[hashServer] ;
90        delete[] hashAll ;
91
92        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
93      }
94      else // using OASIS
95      {
96        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
97
98        oasis_get_localcomm(serverComm);
99      }
100 
101      /////////////////////////////////////////
102      ///////////// PART 2 ////////////////////
103      /////////////////////////////////////////
104     
105
106      // Create the XIOS communicator for every process which is related
107      // to XIOS, as well on client side as on server side
108      MPI_Comm xiosGlobalComm ;
109      string strIds=CXios::getin<string>("clients_code_id","") ;
110      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
111      if (strIds.empty())
112      {
113        // no code Ids given, suppose XIOS initialisation is global           
114        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
115        MPI_Comm splitComm,interComm ;
116        MPI_Comm_rank(globalComm,&commGlobalRank) ;
117        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
118        MPI_Comm_rank(splitComm,&commRank) ;
119        if (commRank==0) serverLeader=commGlobalRank ;
120        else serverLeader=0 ;
121        clientLeader=0 ;
122        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
123        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
124        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
125        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
126        CXios::setXiosComm(xiosGlobalComm) ;
127      }
128      else
129      {
130
131        xiosGlobalCommByFileExchange(serverComm) ;
132
133      }
134     
135      /////////////////////////////////////////
136      ///////////// PART 4 ////////////////////
137      //  create servers intra communicator  //
138      /////////////////////////////////////////
139     
140      int commRank ;
141      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
142      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
143     
144      CXios::setUsingServer() ;
145
146      /////////////////////////////////////////
147      ///////////// PART 5 ////////////////////
148      //       redirect files output         //
149      /////////////////////////////////////////
150     
151      CServer::openInfoStream(CXios::serverFile);
152      CServer::openErrorStream(CXios::serverFile);
153
154      /////////////////////////////////////////
155      ///////////// PART 4 ////////////////////
156      /////////////////////////////////////////
157
158      CXios::launchDaemonsManager(true) ;
159     
160      /////////////////////////////////////////
161      ///////////// PART 5 ////////////////////
162      /////////////////////////////////////////
163
164      // create the services
165
166      auto ressourcesManager=CXios::getRessourcesManager() ;
167      auto servicesManager=CXios::getServicesManager() ;
168      auto contextsManager=CXios::getContextsManager() ;
169      auto daemonsManager=CXios::getDaemonsManager() ;
170      auto serversRessource=CServer::getServersRessource() ;
171
172      if (serversRessource->isServerLeader())
173      {
174        int nbRessources = ressourcesManager->getRessourcesSize() ;
175        if (!CXios::usingServer2)
176        {
177          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
178          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ;
179        }
180        else
181        {
182          int nprocsServer = nbRessources*CXios::ratioServer2/100.;
183          int nprocsGatherer = nbRessources - nprocsServer ;
184         
185          int nbPoolsServer2 = CXios::nbPoolsServer2 ;
186          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
187          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
188          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
189          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ;
190        }
191      }
192      CTimer::get("XIOS initialize").suspend() ;
193
194      /////////////////////////////////////////
195      ///////////// PART 5 ////////////////////
196      /////////////////////////////////////////
197      // loop on event loop
198
199      bool finished=false ;
200      CTimer::get("XIOS event loop").resume() ;
201
202      while (!finished)
203      {
204        finished=daemonsManager->eventLoop() ;
205      }
206      CTimer::get("XIOS event loop").suspend() ;
207    }
208
209
210
211
212
213    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
214    {
215       
216      MPI_Comm globalComm=CXios::getGlobalComm() ;
217      MPI_Comm xiosGlobalComm ;
218     
219      string strIds=CXios::getin<string>("clients_code_id","") ;
220      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
221     
222      int commRank, globalRank ;
223      MPI_Comm_rank(serverComm, &commRank) ;
224      MPI_Comm_rank(globalComm, &globalRank) ;
225      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
226
227      if (commRank==0) // if root process publish name
228      { 
229        std::ofstream ofs (serverFileName, std::ofstream::out);
230        ofs<<globalRank ;
231        ofs.close();
232      }
233       
234      vector<int> clientsRank(clientsCodeId.size()) ;
235      for(int i=0;i<clientsRank.size();i++)
236      {
237        std::ifstream ifs ;
238        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
239        do
240        {
241          ifs.clear() ;
242          ifs.open(fileName, std::ifstream::in) ;
243        } while (ifs.fail()) ;
244        ifs>>clientsRank[i] ;
245        ifs.close() ; 
246      }
247
248      MPI_Comm intraComm ;
249      MPI_Comm_dup(serverComm,&intraComm) ;
250      MPI_Comm interComm ;
251      for(int i=0 ; i<clientsRank.size(); i++)
252      { 
253        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
254        MPI_Comm_free(&intraComm) ;
255        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
256      }
257      xiosGlobalComm=intraComm ; 
258      MPI_Barrier(xiosGlobalComm);
259      if (commRank==0) std::remove(serverFileName.c_str()) ;
260      MPI_Barrier(xiosGlobalComm);
261
262      CXios::setXiosComm(xiosGlobalComm) ;
263     
264    }
265
266
267    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
268    {
269        // untested, need to be tested on a true MPI-2 compliant library
270
271        // try to discover other client/server
272/*
273        // publish server name
274        char portName[MPI_MAX_PORT_NAME];
275        int ierr ;
276        int commRank ;
277        MPI_Comm_rank(serverComm, &commRank) ;
278       
279        if (commRank==0) // if root process publish name
280        { 
281          MPI_Open_port(MPI_INFO_NULL, portName);
282          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
283        }
284
285        MPI_Comm intraComm=serverComm ;
286        MPI_Comm interComm ;
287        for(int i=0 ; i<clientsCodeId.size(); i++)
288        { 
289          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
290          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
291        }
292*/     
293    }
294
295
296    void CServer::finalize(void)
297    {
298      CTimer::get("XIOS").suspend() ;
299     
300      delete eventScheduler ;
301
302      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
303        MPI_Comm_free(&(*it));
304
305      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
306        MPI_Comm_free(&(*it));
307
308        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
309          MPI_Comm_free(&(*it));
310
311//      MPI_Comm_free(&intraComm);
312
313      CXios::finalizeDaemonsManager();
314     
315      if (!is_MPI_Initialized)
316      {
317        if (CXios::usingOasis) oasis_finalize();
318        else MPI_Finalize() ;
319      }
320      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
321      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
322      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
323      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
324
325      CWorkflowGraph::drawWorkFlowGraph_server();
326    }
327
328    /*!
329    * Open a file specified by a suffix and an extension and use it for the given file buffer.
330    * The file name will be suffix+rank+extension.
331    *
332    * \param fileName[in] protype file name
333    * \param ext [in] extension of the file
334    * \param fb [in/out] the file buffer
335    */
336    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
337    {
338      StdStringStream fileNameServer;
339      int numDigit = 0;
340      int commSize = 0;
341      int commRank ;
342      int id;
343     
344      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
345      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
346
347      while (commSize)
348      {
349        commSize /= 10;
350        ++numDigit;
351      }
352      id = commRank;
353
354      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
355      fb->open(fileNameServer.str().c_str(), std::ios::out);
356      if (!fb->is_open())
357        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
358              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
359    }
360
361    /*!
362    * \brief Open a file stream to write the info logs
363    * Open a file stream with a specific file name suffix+rank
364    * to write the info logs.
365    * \param fileName [in] protype file name
366    */
367    void CServer::openInfoStream(const StdString& fileName)
368    {
369      std::filebuf* fb = m_infoStream.rdbuf();
370      openStream(fileName, ".out", fb);
371
372      info.write2File(fb);
373      report.write2File(fb);
374    }
375
376    //! Write the info logs to standard output
377    void CServer::openInfoStream()
378    {
379      info.write2StdOut();
380      report.write2StdOut();
381    }
382
383    //! Close the info logs file if it opens
384    void CServer::closeInfoStream()
385    {
386      if (m_infoStream.is_open()) m_infoStream.close();
387    }
388
389    /*!
390    * \brief Open a file stream to write the error log
391    * Open a file stream with a specific file name suffix+rank
392    * to write the error log.
393    * \param fileName [in] protype file name
394    */
395    void CServer::openErrorStream(const StdString& fileName)
396    {
397      std::filebuf* fb = m_errorStream.rdbuf();
398      openStream(fileName, ".err", fb);
399
400      error.write2File(fb);
401    }
402
403    //! Write the error log to standard error output
404    void CServer::openErrorStream()
405    {
406      error.write2StdErr();
407    }
408
409    //! Close the error log file if it opens
410    void CServer::closeErrorStream()
411    {
412      if (m_errorStream.is_open()) m_errorStream.close();
413    }
414
415    void CServer::launchServersRessource(MPI_Comm serverComm)
416    {
417      serversRessource_ = new CServersRessource(serverComm) ;
418    }
419}
Note: See TracBrowser for help on using the repository browser.