source: XIOS3/trunk/src/server.cpp @ 2420

Last change on this file since 2420 was 2420, checked in by jderouillat, 19 months ago

Add an option (log_memory : set to false by default), to activate memory monitoring. Logs are now buffered.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 18.6 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "mem_checker.hpp"
16#include "event_scheduler.hpp"
17#include "string_tools.hpp"
18#include "ressources_manager.hpp"
19#include "services_manager.hpp"
20#include "contexts_manager.hpp"
21#include "servers_ressource.hpp"
22#include "services.hpp"
23#include <cstdio>
24#include "workflow_graph.hpp"
25#include "release_static_allocation.hpp"
26#include <sys/stat.h>
27#include <unistd.h>
28
29
30
31namespace xios
32{
33    MPI_Comm CServer::intraComm_ ;
34    MPI_Comm CServer::serversComm_ ;
35    std::list<MPI_Comm> CServer::interCommLeft ;
36    std::list<MPI_Comm> CServer::interCommRight ;
37    std::list<MPI_Comm> CServer::contextInterComms;
38    std::list<MPI_Comm> CServer::contextIntraComms;
39    int CServer::serverLevel = 0 ;
40    int CServer::nbContexts = 0;
41    bool CServer::isRoot = false ;
42    int CServer::rank_ = INVALID_RANK;
43    StdOFStream CServer::m_infoStream;
44    StdOFStream CServer::m_errorStream;
45    map<string,CContext*> CServer::contextList ;
46    vector<int> CServer::sndServerGlobalRanks;
47    bool CServer::finished=false ;
48    bool CServer::is_MPI_Initialized ;
49    CEventScheduler* CServer::eventScheduler = 0;
50    CServersRessource* CServer::serversRessource_=nullptr ;
51    CThirdPartyDriver* CServer::driver_ =nullptr ;
52
53       
54    void CServer::initialize(void)
55    {
56     
57      MPI_Comm serverComm ;
58      int initialized ;
59      MPI_Initialized(&initialized) ;
60      if (initialized) is_MPI_Initialized=true ;
61      else is_MPI_Initialized=false ;
62      MPI_Comm globalComm=CXios::getGlobalComm() ;
63      CTimer::get("XIOS server").resume() ;
64      /////////////////////////////////////////
65      ///////////// PART 1 ////////////////////
66      /////////////////////////////////////////
67      // don't use OASIS
68      if (!CXios::usingOasis)
69      {
70        if (!is_MPI_Initialized) MPI_Init(NULL, NULL);
71       
72        // split the global communicator
73        // get hash from all model to attribute a unique color (int) and then split to get client communicator
74        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
75         
76        int commRank, commSize ;
77        MPI_Comm_rank(globalComm,&commRank) ;
78        MPI_Comm_size(globalComm,&commSize) ;
79
80        std::hash<string> hashString ;
81        size_t hashServer=hashString(CXios::xiosCodeId) ;
82         
83        size_t* hashAll = new size_t[commSize] ;
84        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
85         
86        int color=0 ;
87        map<size_t,int> listHash ;
88        for(int i=0 ; i<=commSize ; i++) 
89          if (listHash.count(hashAll[i])==0) 
90          {
91            listHash[hashAll[i]]=color ;
92            color=color+1 ;
93          }
94        color=listHash[hashServer] ;
95        delete[] hashAll ;
96
97        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
98      }
99      else // using OASIS
100      {
101        if (!is_MPI_Initialized) driver_ = new CThirdPartyDriver();
102
103        driver_->getComponentCommunicator( serverComm );
104      }
105      MPI_Comm_dup(serverComm, &intraComm_);
106     
107      CTimer::get("XIOS").resume() ;
108      CTimer::get("XIOS initialize").resume() ;
109 
110      /////////////////////////////////////////
111      ///////////// PART 2 ////////////////////
112      /////////////////////////////////////////
113     
114
115      // Create the XIOS communicator for every process which is related
116      // to XIOS, as well on client side as on server side
117      MPI_Comm xiosGlobalComm ;
118      string strIds=CXios::getin<string>("clients_code_id","") ;
119      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
120      if (strIds.empty())
121      {
122        // no code Ids given, suppose XIOS initialisation is global           
123        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
124        MPI_Comm splitComm,interComm ;
125        MPI_Comm_rank(globalComm,&commGlobalRank) ;
126        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
127        MPI_Comm_rank(splitComm,&commRank) ;
128        if (commRank==0) serverLeader=commGlobalRank ;
129        else serverLeader=0 ;
130        clientLeader=0 ;
131        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
132        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
133        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
134        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
135        CXios::setXiosComm(xiosGlobalComm) ;
136      }
137      else
138      {
139
140        xiosGlobalCommByFileExchange(serverComm) ;
141
142      }
143     
144      /////////////////////////////////////////
145      ///////////// PART 4 ////////////////////
146      //  create servers intra communicator  //
147      /////////////////////////////////////////
148     
149      int commRank ;
150      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
151      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
152     
153      CXios::setUsingServer() ;
154
155      /////////////////////////////////////////
156      ///////////// PART 5 ////////////////////
157      //       redirect files output         //
158      /////////////////////////////////////////
159     
160      CServer::openInfoStream(CXios::serverFile);
161      CServer::openErrorStream(CXios::serverFile);
162
163      CMemChecker::logMem( "CServer::initialize" );
164
165      /////////////////////////////////////////
166      ///////////// PART 4 ////////////////////
167      /////////////////////////////////////////
168
169      CXios::launchDaemonsManager(true) ;
170     
171      /////////////////////////////////////////
172      ///////////// PART 5 ////////////////////
173      /////////////////////////////////////////
174
175      // create the services
176
177      auto ressourcesManager=CXios::getRessourcesManager() ;
178      auto servicesManager=CXios::getServicesManager() ;
179      auto contextsManager=CXios::getContextsManager() ;
180      auto daemonsManager=CXios::getDaemonsManager() ;
181      auto serversRessource=CServer::getServersRessource() ;
182
183      int rank;
184      MPI_Comm_rank(intraComm_, &rank) ;
185      if (rank==0) isRoot=true;
186      else isRoot=false;
187
188      if (serversRessource->isServerLeader())
189      {
190        int nbRessources = ressourcesManager->getRessourcesSize() ;
191        if (!CXios::usingServer2)
192        {
193          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
194          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ;
195          servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ;
196        }
197        else
198        {
199          int nprocsServer = nbRessources*CXios::ratioServer2/100.;
200          int nprocsGatherer = nbRessources - nprocsServer ;
201         
202          int nbPoolsServer2 = CXios::nbPoolsServer2 ;
203          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
204          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
205          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
206          servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ;
207          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ;
208
209
210        }
211//        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;
212      }
213      CTimer::get("XIOS initialize").suspend() ;
214
215      /////////////////////////////////////////
216      ///////////// PART 5 ////////////////////
217      /////////////////////////////////////////
218      // loop on event loop
219
220      bool finished=false ;
221      CTimer::get("XIOS event loop").resume() ;
222
223      while (!finished)
224      {
225        finished=daemonsManager->eventLoop() ;
226      }
227      CTimer::get("XIOS event loop").suspend() ;
228
229      // Delete CContext
230      //CObjectTemplate<CContext>::cleanStaticDataStructure();
231    }
232
233
234
235
236
237    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
238    {
239       
240      MPI_Comm globalComm=CXios::getGlobalComm() ;
241      MPI_Comm xiosGlobalComm ;
242     
243      string strIds=CXios::getin<string>("clients_code_id","") ;
244      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
245     
246      int commRank, globalRank ;
247      MPI_Comm_rank(serverComm, &commRank) ;
248      MPI_Comm_rank(globalComm, &globalRank) ;
249      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
250
251      if (commRank==0) // if root process publish name
252      { 
253        std::ofstream ofs (serverFileName, std::ofstream::out);
254        ofs<<globalRank ;
255        ofs.close();
256      }
257       
258      vector<int> clientsRank(clientsCodeId.size()) ;
259      for(int i=0;i<clientsRank.size();i++)
260      {
261        std::ifstream ifs ;
262        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
263        struct stat buffer;
264        do {
265        } while( stat(fileName.c_str(), &buffer) != 0 );
266        sleep(1);
267        ifs.open(fileName, ifstream::in) ;
268        ifs>>clientsRank[i] ;
269        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
270        ifs.close() ; 
271      }
272
273      MPI_Comm intraComm ;
274      MPI_Comm_dup(serverComm,&intraComm) ;
275      MPI_Comm interComm ;
276      for(int i=0 ; i<clientsRank.size(); i++)
277      { 
278        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
279        interCommLeft.push_back(interComm) ;
280        MPI_Comm_free(&intraComm) ;
281        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
282      }
283      xiosGlobalComm=intraComm ; 
284      MPI_Barrier(xiosGlobalComm);
285      if (commRank==0) std::remove(serverFileName.c_str()) ;
286      MPI_Barrier(xiosGlobalComm);
287
288      CXios::setXiosComm(xiosGlobalComm) ;
289     
290    }
291
292
293    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
294    {
295        // untested, need to be tested on a true MPI-2 compliant library
296
297        // try to discover other client/server
298/*
299        // publish server name
300        char portName[MPI_MAX_PORT_NAME];
301        int ierr ;
302        int commRank ;
303        MPI_Comm_rank(serverComm, &commRank) ;
304       
305        if (commRank==0) // if root process publish name
306        { 
307          MPI_Open_port(MPI_INFO_NULL, portName);
308          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
309        }
310
311        MPI_Comm intraComm=serverComm ;
312        MPI_Comm interComm ;
313        for(int i=0 ; i<clientsCodeId.size(); i++)
314        { 
315          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
316          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
317        }
318*/     
319    }
320
321   /*!
322    * Root process is listening for an order sent by client to call "oasis_enddef".
323    * The root client of a compound send the order (tag 5). It is probed and received.
324    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
325    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
326    */
327   
328     void CServer::listenOasisEnddef(void)
329     {
330        int flag ;
331        MPI_Status status ;
332        list<MPI_Comm>::iterator it;
333        int msg ;
334        static int nbCompound=0 ;
335        int size ;
336        static bool sent=false ;
337        static MPI_Request* allRequests ;
338        static MPI_Status* allStatus ;
339
340
341        if (sent)
342        {
343          MPI_Comm_size(intraComm_,&size) ;
344          MPI_Testall(size,allRequests, &flag, allStatus) ;
345          if (flag==true)
346          {
347            delete [] allRequests ;
348            delete [] allStatus ;
349            sent=false ;
350          }
351        }
352       
353
354        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
355        {
356           MPI_Status status ;
357           traceOff() ;
358           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
359           traceOn() ;
360           if (flag==true)
361           {
362              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
363              nbCompound++ ;
364              if (nbCompound==interCommLeft.size())
365              {
366                MPI_Comm_size(intraComm_,&size) ;
367                allRequests= new MPI_Request[size] ;
368                allStatus= new MPI_Status[size] ;
369                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
370                sent=true ;
371              }
372           }
373        }
374}
375     
376   /*!
377    * Processes probes message from root process if oasis_enddef call must be done.
378    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
379    */
380     void CServer::listenRootOasisEnddef(void)
381     {
382       int flag ;
383       MPI_Status status ;
384       const int root=0 ;
385       int msg ;
386       static bool eventSent=false ;
387
388       if (eventSent)
389       {
390         boost::hash<string> hashString;
391         size_t hashId = hashString("oasis_enddef");
392         if (CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->queryEvent(0,hashId))
393         {
394           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->popEvent() ;
395           driver_->endSynchronizedDefinition() ;
396           eventSent=false ;
397         }
398       }
399         
400       traceOff() ;
401       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
402       traceOn() ;
403       if (flag==true)
404       {
405           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
406           boost::hash<string> hashString;
407           size_t hashId = hashString("oasis_enddef");
408           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->registerEvent(0,hashId);
409           eventSent=true ;
410       }
411     }
412
413    void CServer::finalize(void)
414    {
415      CTimer::get("XIOS").suspend() ;
416      CTimer::get("XIOS server").suspend() ;
417      delete eventScheduler ;
418
419      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
420        MPI_Comm_free(&(*it));
421
422      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
423        MPI_Comm_free(&(*it));
424
425        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
426          MPI_Comm_free(&(*it));
427
428//      MPI_Comm_free(&intraComm);
429      CXios::finalizeDaemonsManager();
430      finalizeServersRessource();
431     
432      CContext::removeAllContexts() ; // free memory for related context
433         
434      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
435
436      CMemChecker::logMem( "CServer::finalize", true );
437      if (!is_MPI_Initialized)
438      {
439        if (CXios::usingOasis) delete driver_;
440        else MPI_Finalize() ;
441      }
442      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
443      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
444      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
445      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
446      report(100)<<CMemChecker::getAllCumulatedMem()<<endl ;
447     
448      CWorkflowGraph::drawWorkFlowGraph_server();
449      xios::releaseStaticAllocation() ; // free memory from static allocation
450    }
451
452    /*!
453    * Open a file specified by a suffix and an extension and use it for the given file buffer.
454    * The file name will be suffix+rank+extension.
455    *
456    * \param fileName[in] protype file name
457    * \param ext [in] extension of the file
458    * \param fb [in/out] the file buffer
459    */
460    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
461    {
462      StdStringStream fileNameServer;
463      int numDigit = 0;
464      int commSize = 0;
465      int commRank ;
466      int id;
467     
468      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
469      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
470
471      while (commSize)
472      {
473        commSize /= 10;
474        ++numDigit;
475      }
476      id = commRank;
477
478      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
479      fb->open(fileNameServer.str().c_str(), std::ios::out);
480      if (!fb->is_open())
481        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
482              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
483    }
484
485    /*!
486    * \brief Open a file stream to write the info logs
487    * Open a file stream with a specific file name suffix+rank
488    * to write the info logs.
489    * \param fileName [in] protype file name
490    */
491    void CServer::openInfoStream(const StdString& fileName)
492    {
493      std::filebuf* fb = m_infoStream.rdbuf();
494      openStream(fileName, ".out", fb);
495
496      info.write2File(fb);
497      report.write2File(fb);
498    }
499
500    //! Write the info logs to standard output
501    void CServer::openInfoStream()
502    {
503      info.write2StdOut();
504      report.write2StdOut();
505    }
506
507    //! Close the info logs file if it opens
508    void CServer::closeInfoStream()
509    {
510      if (m_infoStream.is_open()) m_infoStream.close();
511    }
512
513    /*!
514    * \brief Open a file stream to write the error log
515    * Open a file stream with a specific file name suffix+rank
516    * to write the error log.
517    * \param fileName [in] protype file name
518    */
519    void CServer::openErrorStream(const StdString& fileName)
520    {
521      std::filebuf* fb = m_errorStream.rdbuf();
522      openStream(fileName, ".err", fb);
523
524      error.write2File(fb);
525    }
526
527    //! Write the error log to standard error output
528    void CServer::openErrorStream()
529    {
530      error.write2StdErr();
531    }
532
533    //! Close the error log file if it opens
534    void CServer::closeErrorStream()
535    {
536      if (m_errorStream.is_open()) m_errorStream.close();
537    }
538
539    void CServer::launchServersRessource(MPI_Comm serverComm)
540    {
541      serversRessource_ = new CServersRessource(serverComm) ;
542    }
543
544    void  CServer::finalizeServersRessource(void) 
545    { 
546      delete serversRessource_; serversRessource_=nullptr ;
547    }
548}
Note: See TracBrowser for help on using the repository browser.