source: XIOS/dev/dev_ym/XIOS_COUPLING/src/server.cpp @ 2326

Last change on this file since 2326 was 2310, checked in by ymipsl, 2 years ago

Implement small garbage collector for unfreed MPI windows and communicator.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 14.2 KB
RevLine 
[490]1#include "globalScopeData.hpp"
[591]2#include "xios_spl.hpp"
[300]3#include "cxios.hpp"
[342]4#include "server.hpp"
[983]5#include "client.hpp"
[300]6#include "type.hpp"
7#include "context.hpp"
[352]8#include "object_template.hpp"
[300]9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
[382]12#include "mpi.hpp"
[347]13#include "tracer.hpp"
14#include "timer.hpp"
[492]15#include "event_scheduler.hpp"
[1587]16#include "string_tools.hpp"
[1761]17#include "ressources_manager.hpp"
18#include "services_manager.hpp"
19#include "contexts_manager.hpp"
20#include "servers_ressource.hpp"
21#include <cstdio>
[2146]22#include "workflow_graph.hpp"
[2274]23#include "release_static_allocation.hpp"
[300]24
[1761]25
[2274]26
[335]27namespace xios
[490]28{
[1639]29    MPI_Comm CServer::intraComm ;
[1761]30    MPI_Comm CServer::serversComm_ ;
[1639]31    std::list<MPI_Comm> CServer::interCommLeft ;
32    std::list<MPI_Comm> CServer::interCommRight ;
33    std::list<MPI_Comm> CServer::contextInterComms;
34    std::list<MPI_Comm> CServer::contextIntraComms;
[1021]35    int CServer::serverLevel = 0 ;
[1148]36    int CServer::nbContexts = 0;
[983]37    bool CServer::isRoot = false ;
[1077]38    int CServer::rank_ = INVALID_RANK;
[490]39    StdOFStream CServer::m_infoStream;
[523]40    StdOFStream CServer::m_errorStream;
[490]41    map<string,CContext*> CServer::contextList ;
[1152]42    vector<int> CServer::sndServerGlobalRanks;
[300]43    bool CServer::finished=false ;
44    bool CServer::is_MPI_Initialized ;
[597]45    CEventScheduler* CServer::eventScheduler = 0;
[1761]46    CServersRessource* CServer::serversRessource_=nullptr ;
[983]47
[1765]48       
[1761]49    void CServer::initialize(void)
50    {
51     
52      MPI_Comm serverComm ;
53      int initialized ;
54      MPI_Initialized(&initialized) ;
55      if (initialized) is_MPI_Initialized=true ;
56      else is_MPI_Initialized=false ;
57      MPI_Comm globalComm=CXios::getGlobalComm() ;
58
59      /////////////////////////////////////////
60      ///////////// PART 1 ////////////////////
61      /////////////////////////////////////////
62      // don't use OASIS
63      if (!CXios::usingOasis)
64      {
65        if (!is_MPI_Initialized) MPI_Init(NULL, NULL);
66       
67        // split the global communicator
68        // get hash from all model to attribute a unique color (int) and then split to get client communicator
69        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
70         
71        int commRank, commSize ;
72        MPI_Comm_rank(globalComm,&commRank) ;
73        MPI_Comm_size(globalComm,&commSize) ;
74
75        std::hash<string> hashString ;
76        size_t hashServer=hashString(CXios::xiosCodeId) ;
77         
78        size_t* hashAll = new size_t[commSize] ;
[2242]79        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
[1761]80         
81        int color=0 ;
[2242]82        map<size_t,int> listHash ;
83        for(int i=0 ; i<=commSize ; i++) 
84          if (listHash.count(hashAll[i])==0) 
[1761]85          {
[2242]86            listHash[hashAll[i]]=color ;
[1761]87            color=color+1 ;
88          }
[2242]89        color=listHash[hashServer] ;
[1761]90        delete[] hashAll ;
91
92        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
93      }
94      else // using OASIS
95      {
96        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
97
98        oasis_get_localcomm(serverComm);
99      }
[2290]100      CTimer::get("XIOS").resume() ;
101      CTimer::get("XIOS initialize").resume() ;
[1761]102 
103      /////////////////////////////////////////
104      ///////////// PART 2 ////////////////////
105      /////////////////////////////////////////
106     
107
108      // Create the XIOS communicator for every process which is related
109      // to XIOS, as well on client side as on server side
110      MPI_Comm xiosGlobalComm ;
111      string strIds=CXios::getin<string>("clients_code_id","") ;
112      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
113      if (strIds.empty())
114      {
115        // no code Ids given, suppose XIOS initialisation is global           
116        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
117        MPI_Comm splitComm,interComm ;
118        MPI_Comm_rank(globalComm,&commGlobalRank) ;
119        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
120        MPI_Comm_rank(splitComm,&commRank) ;
121        if (commRank==0) serverLeader=commGlobalRank ;
122        else serverLeader=0 ;
123        clientLeader=0 ;
124        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
125        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
126        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
127        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
128        CXios::setXiosComm(xiosGlobalComm) ;
129      }
130      else
131      {
132
133        xiosGlobalCommByFileExchange(serverComm) ;
134
135      }
136     
137      /////////////////////////////////////////
138      ///////////// PART 4 ////////////////////
139      //  create servers intra communicator  //
140      /////////////////////////////////////////
141     
142      int commRank ;
143      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
144      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
145     
146      CXios::setUsingServer() ;
147
148      /////////////////////////////////////////
149      ///////////// PART 5 ////////////////////
150      //       redirect files output         //
151      /////////////////////////////////////////
152     
153      CServer::openInfoStream(CXios::serverFile);
154      CServer::openErrorStream(CXios::serverFile);
155
156      /////////////////////////////////////////
157      ///////////// PART 4 ////////////////////
158      /////////////////////////////////////////
159
160      CXios::launchDaemonsManager(true) ;
161     
162      /////////////////////////////////////////
163      ///////////// PART 5 ////////////////////
164      /////////////////////////////////////////
165
166      // create the services
167
168      auto ressourcesManager=CXios::getRessourcesManager() ;
169      auto servicesManager=CXios::getServicesManager() ;
170      auto contextsManager=CXios::getContextsManager() ;
171      auto daemonsManager=CXios::getDaemonsManager() ;
172      auto serversRessource=CServer::getServersRessource() ;
173
174      if (serversRessource->isServerLeader())
175      {
176        int nbRessources = ressourcesManager->getRessourcesSize() ;
177        if (!CXios::usingServer2)
178        {
179          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
180          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ;
181        }
182        else
183        {
184          int nprocsServer = nbRessources*CXios::ratioServer2/100.;
185          int nprocsGatherer = nbRessources - nprocsServer ;
186         
187          int nbPoolsServer2 = CXios::nbPoolsServer2 ;
188          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
189          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
190          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
191          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ;
192        }
193      }
[2242]194      CTimer::get("XIOS initialize").suspend() ;
[1761]195
196      /////////////////////////////////////////
197      ///////////// PART 5 ////////////////////
198      /////////////////////////////////////////
199      // loop on event loop
200
201      bool finished=false ;
[2242]202      CTimer::get("XIOS event loop").resume() ;
203
[1761]204      while (!finished)
205      {
206        finished=daemonsManager->eventLoop() ;
207      }
[2242]208      CTimer::get("XIOS event loop").suspend() ;
[2243]209
210      // Delete CContext
[2274]211      //CObjectTemplate<CContext>::cleanStaticDataStructure();
[1761]212    }
213
214
215
216
217
218    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
219    {
220       
221      MPI_Comm globalComm=CXios::getGlobalComm() ;
222      MPI_Comm xiosGlobalComm ;
223     
224      string strIds=CXios::getin<string>("clients_code_id","") ;
225      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
226     
227      int commRank, globalRank ;
228      MPI_Comm_rank(serverComm, &commRank) ;
229      MPI_Comm_rank(globalComm, &globalRank) ;
230      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
231
232      if (commRank==0) // if root process publish name
233      { 
234        std::ofstream ofs (serverFileName, std::ofstream::out);
235        ofs<<globalRank ;
236        ofs.close();
237      }
238       
239      vector<int> clientsRank(clientsCodeId.size()) ;
240      for(int i=0;i<clientsRank.size();i++)
241      {
242        std::ifstream ifs ;
243        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
244        do
245        {
246          ifs.clear() ;
247          ifs.open(fileName, std::ifstream::in) ;
248        } while (ifs.fail()) ;
249        ifs>>clientsRank[i] ;
250        ifs.close() ; 
251      }
252
253      MPI_Comm intraComm ;
254      MPI_Comm_dup(serverComm,&intraComm) ;
255      MPI_Comm interComm ;
256      for(int i=0 ; i<clientsRank.size(); i++)
257      { 
258        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
259        MPI_Comm_free(&intraComm) ;
260        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
261      }
262      xiosGlobalComm=intraComm ; 
263      MPI_Barrier(xiosGlobalComm);
264      if (commRank==0) std::remove(serverFileName.c_str()) ;
265      MPI_Barrier(xiosGlobalComm);
266
267      CXios::setXiosComm(xiosGlobalComm) ;
268     
269    }
270
271
272    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
273    {
274        // untested, need to be tested on a true MPI-2 compliant library
275
276        // try to discover other client/server
277/*
278        // publish server name
279        char portName[MPI_MAX_PORT_NAME];
280        int ierr ;
281        int commRank ;
282        MPI_Comm_rank(serverComm, &commRank) ;
283       
284        if (commRank==0) // if root process publish name
285        { 
286          MPI_Open_port(MPI_INFO_NULL, portName);
287          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
288        }
289
290        MPI_Comm intraComm=serverComm ;
291        MPI_Comm interComm ;
292        for(int i=0 ; i<clientsCodeId.size(); i++)
293        { 
294          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
295          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
296        }
297*/     
298    }
299
[490]300
[300]301    void CServer::finalize(void)
302    {
[361]303      CTimer::get("XIOS").suspend() ;
[697]304     
[492]305      delete eventScheduler ;
[655]306
[1639]307      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
308        MPI_Comm_free(&(*it));
[983]309
[1639]310      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
311        MPI_Comm_free(&(*it));
[1071]312
[1639]313        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
314          MPI_Comm_free(&(*it));
[992]315
[1761]316//      MPI_Comm_free(&intraComm);
[2266]317      CXios::finalizeDaemonsManager();
[2274]318      finalizeServersRessource();
[1764]319     
[2274]320      CContext::removeAllContexts() ; // free memory for related context
321         
[2310]322      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
323
[300]324      if (!is_MPI_Initialized)
[490]325      {
[300]326        if (CXios::usingOasis) oasis_finalize();
[1639]327        else MPI_Finalize() ;
[300]328      }
[347]329      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
330      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
331      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
[1158]332      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
[2274]333     
[2146]334      CWorkflowGraph::drawWorkFlowGraph_server();
[2274]335      xios::releaseStaticAllocation() ; // free memory from static allocation
[300]336    }
[490]337
[523]338    /*!
339    * Open a file specified by a suffix and an extension and use it for the given file buffer.
340    * The file name will be suffix+rank+extension.
341    *
342    * \param fileName[in] protype file name
343    * \param ext [in] extension of the file
344    * \param fb [in/out] the file buffer
345    */
346    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
347    {
[1761]348      StdStringStream fileNameServer;
[523]349      int numDigit = 0;
[1761]350      int commSize = 0;
351      int commRank ;
[1021]352      int id;
[1761]353     
354      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
355      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
356
357      while (commSize)
[523]358      {
[1761]359        commSize /= 10;
[523]360        ++numDigit;
361      }
[1761]362      id = commRank;
[497]363
[1761]364      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
365      fb->open(fileNameServer.str().c_str(), std::ios::out);
[523]366      if (!fb->is_open())
367        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
[1761]368              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
[523]369    }
[490]370
[523]371    /*!
372    * \brief Open a file stream to write the info logs
373    * Open a file stream with a specific file name suffix+rank
374    * to write the info logs.
375    * \param fileName [in] protype file name
376    */
377    void CServer::openInfoStream(const StdString& fileName)
378    {
379      std::filebuf* fb = m_infoStream.rdbuf();
380      openStream(fileName, ".out", fb);
[490]381
[523]382      info.write2File(fb);
383      report.write2File(fb);
384    }
[490]385
[523]386    //! Write the info logs to standard output
387    void CServer::openInfoStream()
388    {
389      info.write2StdOut();
390      report.write2StdOut();
391    }
[490]392
[523]393    //! Close the info logs file if it opens
394    void CServer::closeInfoStream()
395    {
396      if (m_infoStream.is_open()) m_infoStream.close();
397    }
398
399    /*!
400    * \brief Open a file stream to write the error log
401    * Open a file stream with a specific file name suffix+rank
402    * to write the error log.
403    * \param fileName [in] protype file name
404    */
405    void CServer::openErrorStream(const StdString& fileName)
406    {
407      std::filebuf* fb = m_errorStream.rdbuf();
408      openStream(fileName, ".err", fb);
409
410      error.write2File(fb);
411    }
412
413    //! Write the error log to standard error output
414    void CServer::openErrorStream()
415    {
416      error.write2StdErr();
417    }
418
419    //! Close the error log file if it opens
420    void CServer::closeErrorStream()
421    {
422      if (m_errorStream.is_open()) m_errorStream.close();
423    }
[1761]424
425    void CServer::launchServersRessource(MPI_Comm serverComm)
426    {
427      serversRessource_ = new CServersRessource(serverComm) ;
428    }
[2274]429
430    void  CServer::finalizeServersRessource(void) 
431    { 
432      delete serversRessource_; serversRessource_=nullptr ;
433    }
[300]434}
Note: See TracBrowser for help on using the repository browser.