source: XIOS3/trunk/src/server.cpp @ 2437

Last change on this file since 2437 was 2437, checked in by jderouillat, 19 months ago

On servers, move first call of CTimer after the first step of servers initialisations (IntelMPI considers MPI_Wtime as a MPI call, need to be after MPI_Init)

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 18.6 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "mem_checker.hpp"
16#include "event_scheduler.hpp"
17#include "string_tools.hpp"
18#include "ressources_manager.hpp"
19#include "services_manager.hpp"
20#include "contexts_manager.hpp"
21#include "servers_ressource.hpp"
22#include "services.hpp"
23#include <cstdio>
24#include "workflow_graph.hpp"
25#include "release_static_allocation.hpp"
26#include <sys/stat.h>
27#include <unistd.h>
28
29
30
31namespace xios
32{
33    MPI_Comm CServer::intraComm_ ;
34    MPI_Comm CServer::serversComm_ ;
35    std::list<MPI_Comm> CServer::interCommLeft ;
36    std::list<MPI_Comm> CServer::interCommRight ;
37    std::list<MPI_Comm> CServer::contextInterComms;
38    std::list<MPI_Comm> CServer::contextIntraComms;
39    int CServer::serverLevel = 0 ;
40    int CServer::nbContexts = 0;
41    bool CServer::isRoot = false ;
42    int CServer::rank_ = INVALID_RANK;
43    StdOFStream CServer::m_infoStream;
44    StdOFStream CServer::m_errorStream;
45    map<string,CContext*> CServer::contextList ;
46    vector<int> CServer::sndServerGlobalRanks;
47    bool CServer::finished=false ;
48    bool CServer::is_MPI_Initialized ;
49    CEventScheduler* CServer::eventScheduler = 0;
50    CServersRessource* CServer::serversRessource_=nullptr ;
51    CThirdPartyDriver* CServer::driver_ =nullptr ;
52
53       
54    void CServer::initialize(void)
55    {
56     
57      MPI_Comm serverComm ;
58      int initialized ;
59      MPI_Initialized(&initialized) ;
60      if (initialized) is_MPI_Initialized=true ;
61      else is_MPI_Initialized=false ;
62      MPI_Comm globalComm=CXios::getGlobalComm() ;
63      /////////////////////////////////////////
64      ///////////// PART 1 ////////////////////
65      /////////////////////////////////////////
66      // don't use OASIS
67      if (!CXios::usingOasis)
68      {
69        if (!is_MPI_Initialized) MPI_Init(NULL, NULL);
70       
71        // split the global communicator
72        // get hash from all model to attribute a unique color (int) and then split to get client communicator
73        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
74         
75        int commRank, commSize ;
76        MPI_Comm_rank(globalComm,&commRank) ;
77        MPI_Comm_size(globalComm,&commSize) ;
78
79        std::hash<string> hashString ;
80        size_t hashServer=hashString(CXios::xiosCodeId) ;
81         
82        size_t* hashAll = new size_t[commSize] ;
83        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
84         
85        int color=0 ;
86        map<size_t,int> listHash ;
87        for(int i=0 ; i<=commSize ; i++) 
88          if (listHash.count(hashAll[i])==0) 
89          {
90            listHash[hashAll[i]]=color ;
91            color=color+1 ;
92          }
93        color=listHash[hashServer] ;
94        delete[] hashAll ;
95
96        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
97      }
98      else // using OASIS
99      {
100        if (!is_MPI_Initialized) driver_ = new CThirdPartyDriver();
101
102        driver_->getComponentCommunicator( serverComm );
103      }
104      MPI_Comm_dup(serverComm, &intraComm_);
105     
106      CTimer::get("XIOS").resume() ;
107      CTimer::get("XIOS server").resume() ;
108      CTimer::get("XIOS initialize").resume() ;
109 
110      /////////////////////////////////////////
111      ///////////// PART 2 ////////////////////
112      /////////////////////////////////////////
113     
114
115      // Create the XIOS communicator for every process which is related
116      // to XIOS, as well on client side as on server side
117      MPI_Comm xiosGlobalComm ;
118      string strIds=CXios::getin<string>("clients_code_id","") ;
119      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
120      if (strIds.empty())
121      {
122        // no code Ids given, suppose XIOS initialisation is global           
123        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
124        MPI_Comm splitComm,interComm ;
125        MPI_Comm_rank(globalComm,&commGlobalRank) ;
126        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
127        MPI_Comm_rank(splitComm,&commRank) ;
128        if (commRank==0) serverLeader=commGlobalRank ;
129        else serverLeader=0 ;
130        clientLeader=0 ;
131        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
132        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
133        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
134        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
135        CXios::setXiosComm(xiosGlobalComm) ;
136      }
137      else
138      {
139
140        xiosGlobalCommByFileExchange(serverComm) ;
141
142      }
143     
144      /////////////////////////////////////////
145      ///////////// PART 4 ////////////////////
146      //  create servers intra communicator  //
147      /////////////////////////////////////////
148     
149      int commRank ;
150      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
151      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
152     
153      CXios::setUsingServer() ;
154
155      /////////////////////////////////////////
156      ///////////// PART 5 ////////////////////
157      //       redirect files output         //
158      /////////////////////////////////////////
159     
160      CServer::openInfoStream(CXios::serverFile);
161      CServer::openErrorStream(CXios::serverFile);
162
163      CMemChecker::logMem( "CServer::initialize" );
164
165      /////////////////////////////////////////
166      ///////////// PART 4 ////////////////////
167      /////////////////////////////////////////
168
169      CXios::launchDaemonsManager(true) ;
170     
171      /////////////////////////////////////////
172      ///////////// PART 5 ////////////////////
173      /////////////////////////////////////////
174
175      // create the services
176
177      auto ressourcesManager=CXios::getRessourcesManager() ;
178      auto servicesManager=CXios::getServicesManager() ;
179      auto contextsManager=CXios::getContextsManager() ;
180      auto daemonsManager=CXios::getDaemonsManager() ;
181      auto serversRessource=CServer::getServersRessource() ;
182
183      int rank;
184      MPI_Comm_rank(intraComm_, &rank) ;
185      if (rank==0) isRoot=true;
186      else isRoot=false;
187
188      if (serversRessource->isServerLeader())
189      {
190        int nbRessources = ressourcesManager->getRessourcesSize() ;
191        if (!CXios::usingServer2)
192        {
193          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
194          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ;
195          servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ;
196        }
197        else
198        {
199          int nprocsServer = nbRessources*CXios::ratioServer2/100.;
200          int nprocsGatherer = nbRessources - nprocsServer ;
201         
202          int nbPoolsServer2 = CXios::nbPoolsServer2 ;
203          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
204          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
205          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
206          servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ;
207          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ;
208
209
210        }
211//        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ;
212      }
213      CTimer::get("XIOS initialize").suspend() ;
214
215      /////////////////////////////////////////
216      ///////////// PART 5 ////////////////////
217      /////////////////////////////////////////
218      // loop on event loop
219
220      bool finished=false ;
221      CTimer::get("XIOS event loop").resume() ;
222
223      while (!finished)
224      {
225        finished=daemonsManager->eventLoop() ;
226      }
227      CTimer::get("XIOS event loop").suspend() ;
228
229      // Delete CContext
230      //CObjectTemplate<CContext>::cleanStaticDataStructure();
231    }
232
233
234
235
236
237    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
238    {
239       
240      MPI_Comm globalComm=CXios::getGlobalComm() ;
241      MPI_Comm xiosGlobalComm ;
242     
243      string strIds=CXios::getin<string>("clients_code_id","") ;
244      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
245     
246      int commRank, globalRank ;
247      MPI_Comm_rank(serverComm, &commRank) ;
248      MPI_Comm_rank(globalComm, &globalRank) ;
249      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
250
251      if (commRank==0) // if root process publish name
252      { 
253        std::ofstream ofs (serverFileName, std::ofstream::out);
254        ofs<<globalRank ;
255        ofs.close();
256      }
257       
258      vector<int> clientsRank(clientsCodeId.size()) ;
259      for(int i=0;i<clientsRank.size();i++)
260      {
261        std::ifstream ifs ;
262        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
263        struct stat buffer;
264        do {
265        } while( stat(fileName.c_str(), &buffer) != 0 );
266        sleep(1);
267        ifs.open(fileName, ifstream::in) ;
268        ifs>>clientsRank[i] ;
269        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
270        ifs.close() ; 
271      }
272
273      MPI_Comm intraComm ;
274      MPI_Comm_dup(serverComm,&intraComm) ;
275      MPI_Comm interComm ;
276      for(int i=0 ; i<clientsRank.size(); i++)
277      { 
278        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
279        interCommLeft.push_back(interComm) ;
280        MPI_Comm_free(&intraComm) ;
281        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
282      }
283      xiosGlobalComm=intraComm ; 
284      MPI_Barrier(xiosGlobalComm);
285      if (commRank==0) std::remove(serverFileName.c_str()) ;
286      MPI_Barrier(xiosGlobalComm);
287
288      CXios::setXiosComm(xiosGlobalComm) ;
289     
290    }
291
292
293    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
294    {
295        // untested, need to be tested on a true MPI-2 compliant library
296
297        // try to discover other client/server
298/*
299        // publish server name
300        char portName[MPI_MAX_PORT_NAME];
301        int ierr ;
302        int commRank ;
303        MPI_Comm_rank(serverComm, &commRank) ;
304       
305        if (commRank==0) // if root process publish name
306        { 
307          MPI_Open_port(MPI_INFO_NULL, portName);
308          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
309        }
310
311        MPI_Comm intraComm=serverComm ;
312        MPI_Comm interComm ;
313        for(int i=0 ; i<clientsCodeId.size(); i++)
314        { 
315          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
316          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
317        }
318*/     
319    }
320
321   /*!
322    * Root process is listening for an order sent by client to call "oasis_enddef".
323    * The root client of a compound send the order (tag 5). It is probed and received.
324    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
325    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
326    */
327   
328     void CServer::listenOasisEnddef(void)
329     {
330        int flag ;
331        MPI_Status status ;
332        list<MPI_Comm>::iterator it;
333        int msg ;
334        static int nbCompound=0 ;
335        int size ;
336        static bool sent=false ;
337        static MPI_Request* allRequests ;
338        static MPI_Status* allStatus ;
339
340
341        if (sent)
342        {
343          MPI_Comm_size(intraComm_,&size) ;
344          MPI_Testall(size,allRequests, &flag, allStatus) ;
345          if (flag==true)
346          {
347            delete [] allRequests ;
348            delete [] allStatus ;
349            sent=false ;
350          }
351        }
352       
353
354        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
355        {
356           MPI_Status status ;
357           traceOff() ;
358           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
359           traceOn() ;
360           if (flag==true)
361           {
362              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
363              nbCompound++ ;
364              if (nbCompound==interCommLeft.size())
365              {
366                MPI_Comm_size(intraComm_,&size) ;
367                allRequests= new MPI_Request[size] ;
368                allStatus= new MPI_Status[size] ;
369                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
370                sent=true ;
371              }
372           }
373        }
374}
375     
376   /*!
377    * Processes probes message from root process if oasis_enddef call must be done.
378    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
379    */
380     void CServer::listenRootOasisEnddef(void)
381     {
382       int flag ;
383       MPI_Status status ;
384       const int root=0 ;
385       int msg ;
386       static bool eventSent=false ;
387
388       if (eventSent)
389       {
390         boost::hash<string> hashString;
391         size_t hashId = hashString("oasis_enddef");
392         if (CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->queryEvent(0,hashId))
393         {
394           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->popEvent() ;
395           driver_->endSynchronizedDefinition() ;
396           eventSent=false ;
397         }
398       }
399         
400       traceOff() ;
401       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
402       traceOn() ;
403       if (flag==true)
404       {
405           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
406           boost::hash<string> hashString;
407           size_t hashId = hashString("oasis_enddef");
408           CXios::getPoolRessource()->getService(CXios::defaultServicesId,0)->getEventScheduler()->registerEvent(0,hashId);
409           eventSent=true ;
410       }
411     }
412
413    void CServer::finalize(void)
414    {
415      CTimer::get("XIOS").suspend() ;
416      CTimer::get("XIOS server").suspend() ;
417      delete eventScheduler ;
418
419      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
420        MPI_Comm_free(&(*it));
421
422      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
423        MPI_Comm_free(&(*it));
424
425        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
426          MPI_Comm_free(&(*it));
427
428//      MPI_Comm_free(&intraComm);
429      CXios::finalizeDaemonsManager();
430      finalizeServersRessource();
431     
432      CContext::removeAllContexts() ; // free memory for related context
433         
434      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
435
436      CMemChecker::logMem( "CServer::finalize", true );
437      if (!is_MPI_Initialized)
438      {
439        if (CXios::usingOasis) delete driver_;
440        else MPI_Finalize() ;
441      }
442      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
443      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
444      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
445      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
446      report(100)<<CMemChecker::getAllCumulatedMem()<<endl ;
447     
448      CWorkflowGraph::drawWorkFlowGraph_server();
449      xios::releaseStaticAllocation() ; // free memory from static allocation
450    }
451
452    /*!
453    * Open a file specified by a suffix and an extension and use it for the given file buffer.
454    * The file name will be suffix+rank+extension.
455    *
456    * \param fileName[in] protype file name
457    * \param ext [in] extension of the file
458    * \param fb [in/out] the file buffer
459    */
460    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
461    {
462      StdStringStream fileNameServer;
463      int numDigit = 0;
464      int commSize = 0;
465      int commRank ;
466      int id;
467     
468      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
469      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
470
471      while (commSize)
472      {
473        commSize /= 10;
474        ++numDigit;
475      }
476      id = commRank;
477
478      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
479      fb->open(fileNameServer.str().c_str(), std::ios::out);
480      if (!fb->is_open())
481        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
482              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
483    }
484
485    /*!
486    * \brief Open a file stream to write the info logs
487    * Open a file stream with a specific file name suffix+rank
488    * to write the info logs.
489    * \param fileName [in] protype file name
490    */
491    void CServer::openInfoStream(const StdString& fileName)
492    {
493      std::filebuf* fb = m_infoStream.rdbuf();
494      openStream(fileName, ".out", fb);
495
496      info.write2File(fb);
497      report.write2File(fb);
498    }
499
500    //! Write the info logs to standard output
501    void CServer::openInfoStream()
502    {
503      info.write2StdOut();
504      report.write2StdOut();
505    }
506
507    //! Close the info logs file if it opens
508    void CServer::closeInfoStream()
509    {
510      if (m_infoStream.is_open()) m_infoStream.close();
511    }
512
513    /*!
514    * \brief Open a file stream to write the error log
515    * Open a file stream with a specific file name suffix+rank
516    * to write the error log.
517    * \param fileName [in] protype file name
518    */
519    void CServer::openErrorStream(const StdString& fileName)
520    {
521      std::filebuf* fb = m_errorStream.rdbuf();
522      openStream(fileName, ".err", fb);
523
524      error.write2File(fb);
525    }
526
527    //! Write the error log to standard error output
528    void CServer::openErrorStream()
529    {
530      error.write2StdErr();
531    }
532
533    //! Close the error log file if it opens
534    void CServer::closeErrorStream()
535    {
536      if (m_errorStream.is_open()) m_errorStream.close();
537    }
538
539    void CServer::launchServersRessource(MPI_Comm serverComm)
540    {
541      serversRessource_ = new CServersRessource(serverComm) ;
542    }
543
544    void  CServer::finalizeServersRessource(void) 
545    { 
546      delete serversRessource_; serversRessource_=nullptr ;
547    }
548}
Note: See TracBrowser for help on using the repository browser.