source: XIOS/dev/dev_ym/XIOS_COUPLING/src/server.cpp @ 2333

Last change on this file since 2333 was 2333, checked in by jderouillat, 2 years ago

Reintroduced oasis_enddef management on the server side (communications are operated on MPI_COMM_WORLD in Oasis). XIOS clients require to initilialize Oasis by themselves first, and then to specify their local_comm to xios_initialize. Coupled components must be specified through clients_code_id in iodef.xml

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 17.8 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "event_scheduler.hpp"
16#include "string_tools.hpp"
17#include "ressources_manager.hpp"
18#include "services_manager.hpp"
19#include "contexts_manager.hpp"
20#include "servers_ressource.hpp"
21#include "services.hpp"
22#include <cstdio>
23#include "workflow_graph.hpp"
24#include "release_static_allocation.hpp"
25#include <sys/stat.h>
26#include <unistd.h>
27
28
29
30namespace xios
31{
32    MPI_Comm CServer::intraComm_ ;
33    MPI_Comm CServer::serversComm_ ;
34    std::list<MPI_Comm> CServer::interCommLeft ;
35    std::list<MPI_Comm> CServer::interCommRight ;
36    std::list<MPI_Comm> CServer::contextInterComms;
37    std::list<MPI_Comm> CServer::contextIntraComms;
38    int CServer::serverLevel = 0 ;
39    int CServer::nbContexts = 0;
40    bool CServer::isRoot = false ;
41    int CServer::rank_ = INVALID_RANK;
42    StdOFStream CServer::m_infoStream;
43    StdOFStream CServer::m_errorStream;
44    map<string,CContext*> CServer::contextList ;
45    vector<int> CServer::sndServerGlobalRanks;
46    bool CServer::finished=false ;
47    bool CServer::is_MPI_Initialized ;
48    CEventScheduler* CServer::eventScheduler = 0;
49    CServersRessource* CServer::serversRessource_=nullptr ;
50
51       
52    void CServer::initialize(void)
53    {
54     
55      MPI_Comm serverComm ;
56      int initialized ;
57      MPI_Initialized(&initialized) ;
58      if (initialized) is_MPI_Initialized=true ;
59      else is_MPI_Initialized=false ;
60      MPI_Comm globalComm=CXios::getGlobalComm() ;
61
62      /////////////////////////////////////////
63      ///////////// PART 1 ////////////////////
64      /////////////////////////////////////////
65      // don't use OASIS
66      if (!CXios::usingOasis)
67      {
68        if (!is_MPI_Initialized) MPI_Init(NULL, NULL);
69       
70        // split the global communicator
71        // get hash from all model to attribute a unique color (int) and then split to get client communicator
72        // every mpi process of globalComm (MPI_COMM_WORLD) must participate
73         
74        int commRank, commSize ;
75        MPI_Comm_rank(globalComm,&commRank) ;
76        MPI_Comm_size(globalComm,&commSize) ;
77
78        std::hash<string> hashString ;
79        size_t hashServer=hashString(CXios::xiosCodeId) ;
80         
81        size_t* hashAll = new size_t[commSize] ;
82        MPI_Allgather(&hashServer,1,MPI_SIZE_T,hashAll,1,MPI_SIZE_T,globalComm) ;
83         
84        int color=0 ;
85        map<size_t,int> listHash ;
86        for(int i=0 ; i<=commSize ; i++) 
87          if (listHash.count(hashAll[i])==0) 
88          {
89            listHash[hashAll[i]]=color ;
90            color=color+1 ;
91          }
92        color=listHash[hashServer] ;
93        delete[] hashAll ;
94
95        MPI_Comm_split(globalComm, color, commRank, &serverComm) ;
96      }
97      else // using OASIS
98      {
99        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
100
101        oasis_get_localcomm(serverComm);
102        MPI_Comm_dup(serverComm, &intraComm_);
103      }
104      CTimer::get("XIOS").resume() ;
105      CTimer::get("XIOS initialize").resume() ;
106 
107      /////////////////////////////////////////
108      ///////////// PART 2 ////////////////////
109      /////////////////////////////////////////
110     
111
112      // Create the XIOS communicator for every process which is related
113      // to XIOS, as well on client side as on server side
114      MPI_Comm xiosGlobalComm ;
115      string strIds=CXios::getin<string>("clients_code_id","") ;
116      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
117      if (strIds.empty())
118      {
119        // no code Ids given, suppose XIOS initialisation is global           
120        int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ;
121        MPI_Comm splitComm,interComm ;
122        MPI_Comm_rank(globalComm,&commGlobalRank) ;
123        MPI_Comm_split(globalComm, 1, commGlobalRank, &splitComm) ;
124        MPI_Comm_rank(splitComm,&commRank) ;
125        if (commRank==0) serverLeader=commGlobalRank ;
126        else serverLeader=0 ;
127        clientLeader=0 ;
128        MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
129        MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ;
130        MPI_Intercomm_create(splitComm, 0, globalComm, clientRemoteLeader,1341,&interComm) ;
131        MPI_Intercomm_merge(interComm,false,&xiosGlobalComm) ;
132        CXios::setXiosComm(xiosGlobalComm) ;
133      }
134      else
135      {
136
137        xiosGlobalCommByFileExchange(serverComm) ;
138
139      }
140     
141      /////////////////////////////////////////
142      ///////////// PART 4 ////////////////////
143      //  create servers intra communicator  //
144      /////////////////////////////////////////
145     
146      int commRank ;
147      MPI_Comm_rank(CXios::getXiosComm(), &commRank) ;
148      MPI_Comm_split(CXios::getXiosComm(),true,commRank,&serversComm_) ;
149     
150      CXios::setUsingServer() ;
151
152      /////////////////////////////////////////
153      ///////////// PART 5 ////////////////////
154      //       redirect files output         //
155      /////////////////////////////////////////
156     
157      CServer::openInfoStream(CXios::serverFile);
158      CServer::openErrorStream(CXios::serverFile);
159
160      /////////////////////////////////////////
161      ///////////// PART 4 ////////////////////
162      /////////////////////////////////////////
163
164      CXios::launchDaemonsManager(true) ;
165     
166      /////////////////////////////////////////
167      ///////////// PART 5 ////////////////////
168      /////////////////////////////////////////
169
170      // create the services
171
172      auto ressourcesManager=CXios::getRessourcesManager() ;
173      auto servicesManager=CXios::getServicesManager() ;
174      auto contextsManager=CXios::getContextsManager() ;
175      auto daemonsManager=CXios::getDaemonsManager() ;
176      auto serversRessource=CServer::getServersRessource() ;
177
178      int rank;
179      MPI_Comm_rank(intraComm_, &rank) ;
180      if (rank==0) isRoot=true;
181      else isRoot=false;
182
183      if (serversRessource->isServerLeader())
184      {
185        int nbRessources = ressourcesManager->getRessourcesSize() ;
186        if (!CXios::usingServer2)
187        {
188          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
189          servicesManager->createServices(CXios::defaultPoolId, CXios::defaultServerId, CServicesManager::IO_SERVER,nbRessources,1) ;
190        }
191        else
192        {
193          int nprocsServer = nbRessources*CXios::ratioServer2/100.;
194          int nprocsGatherer = nbRessources - nprocsServer ;
195         
196          int nbPoolsServer2 = CXios::nbPoolsServer2 ;
197          if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer;
198          ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ;
199          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ;
200          servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServerId, CServicesManager::OUT_SERVER, nprocsServer, nbPoolsServer2) ;
201        }
202      }
203      CTimer::get("XIOS initialize").suspend() ;
204
205      /////////////////////////////////////////
206      ///////////// PART 5 ////////////////////
207      /////////////////////////////////////////
208      // loop on event loop
209
210      bool finished=false ;
211      CTimer::get("XIOS event loop").resume() ;
212
213      while (!finished)
214      {
215        finished=daemonsManager->eventLoop() ;
216      }
217      CTimer::get("XIOS event loop").suspend() ;
218
219      // Delete CContext
220      //CObjectTemplate<CContext>::cleanStaticDataStructure();
221    }
222
223
224
225
226
227    void  CServer::xiosGlobalCommByFileExchange(MPI_Comm serverComm)
228    {
229       
230      MPI_Comm globalComm=CXios::getGlobalComm() ;
231      MPI_Comm xiosGlobalComm ;
232     
233      string strIds=CXios::getin<string>("clients_code_id","") ;
234      vector<string> clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ;
235     
236      int commRank, globalRank ;
237      MPI_Comm_rank(serverComm, &commRank) ;
238      MPI_Comm_rank(globalComm, &globalRank) ;
239      string serverFileName("__xios_publisher::"+CXios::xiosCodeId+"__to_remove__") ;
240
241      if (commRank==0) // if root process publish name
242      { 
243        std::ofstream ofs (serverFileName, std::ofstream::out);
244        ofs<<globalRank ;
245        ofs.close();
246      }
247       
248      vector<int> clientsRank(clientsCodeId.size()) ;
249      for(int i=0;i<clientsRank.size();i++)
250      {
251        std::ifstream ifs ;
252        string fileName=("__xios_publisher::"+clientsCodeId[i]+"__to_remove__") ;
253        struct stat buffer;
254        do {
255        } while( stat(fileName.c_str(), &buffer) != 0 );
256        sleep(1);
257        ifs.open(fileName, ifstream::in) ;
258        ifs>>clientsRank[i] ;
259        //cout <<  "\t\t read: " << clientsRank[i] << " in " << fileName << endl;
260        ifs.close() ; 
261      }
262
263      MPI_Comm intraComm ;
264      MPI_Comm_dup(serverComm,&intraComm) ;
265      MPI_Comm interComm ;
266      for(int i=0 ; i<clientsRank.size(); i++)
267      { 
268        MPI_Intercomm_create(intraComm, 0, globalComm, clientsRank[i], 3141, &interComm);
269        interCommLeft.push_back(interComm) ;
270        MPI_Comm_free(&intraComm) ;
271        MPI_Intercomm_merge(interComm,false, &intraComm ) ;
272      }
273      xiosGlobalComm=intraComm ; 
274      MPI_Barrier(xiosGlobalComm);
275      if (commRank==0) std::remove(serverFileName.c_str()) ;
276      MPI_Barrier(xiosGlobalComm);
277
278      CXios::setXiosComm(xiosGlobalComm) ;
279     
280    }
281
282
283    void  CServer::xiosGlobalCommByPublishing(MPI_Comm serverComm)
284    {
285        // untested, need to be tested on a true MPI-2 compliant library
286
287        // try to discover other client/server
288/*
289        // publish server name
290        char portName[MPI_MAX_PORT_NAME];
291        int ierr ;
292        int commRank ;
293        MPI_Comm_rank(serverComm, &commRank) ;
294       
295        if (commRank==0) // if root process publish name
296        { 
297          MPI_Open_port(MPI_INFO_NULL, portName);
298          MPI_Publish_name(CXios::xiosCodeId.c_str(), MPI_INFO_NULL, portName);
299        }
300
301        MPI_Comm intraComm=serverComm ;
302        MPI_Comm interComm ;
303        for(int i=0 ; i<clientsCodeId.size(); i++)
304        { 
305          MPI_Comm_accept(portName, MPI_INFO_NULL, 0, intraComm, &interComm);
306          MPI_Intercomm_merge(interComm,false, &intraComm ) ;
307        }
308*/     
309    }
310
311   /*!
312    * Root process is listening for an order sent by client to call "oasis_enddef".
313    * The root client of a compound send the order (tag 5). It is probed and received.
314    * When the order has been received from each coumpound, the server root process ping the order to the root processes of the secondary levels of servers (if any).
315    * After, it also inform (asynchronous call) other processes of the communicator that the oasis_enddef call must be done
316    */
317   
318     void CServer::listenOasisEnddef(void)
319     {
320        int flag ;
321        MPI_Status status ;
322        list<MPI_Comm>::iterator it;
323        int msg ;
324        static int nbCompound=0 ;
325        int size ;
326        static bool sent=false ;
327        static MPI_Request* allRequests ;
328        static MPI_Status* allStatus ;
329
330
331        if (sent)
332        {
333          MPI_Comm_size(intraComm_,&size) ;
334          MPI_Testall(size,allRequests, &flag, allStatus) ;
335          if (flag==true)
336          {
337            delete [] allRequests ;
338            delete [] allStatus ;
339            sent=false ;
340          }
341        }
342       
343
344        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
345        {
346           MPI_Status status ;
347           traceOff() ;
348           MPI_Iprobe(0,5,*it,&flag,&status) ;  // tags oasis_endded = 5
349           traceOn() ;
350           if (flag==true)
351           {
352              MPI_Recv(&msg,1,MPI_INT,0,5,*it,&status) ; // tags oasis_endded = 5
353              nbCompound++ ;
354              if (nbCompound==interCommLeft.size())
355              {
356                MPI_Comm_size(intraComm_,&size) ;
357                allRequests= new MPI_Request[size] ;
358                allStatus= new MPI_Status[size] ;
359                for(int i=0;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,5,intraComm_,&allRequests[i]) ; // tags oasis_endded = 5
360                sent=true ;
361              }
362           }
363        }
364}
365     
366   /*!
367    * Processes probes message from root process if oasis_enddef call must be done.
368    * When the order is received it is scheduled to be treated in a synchronized way by all server processes of the communicator
369    */
370     void CServer::listenRootOasisEnddef(void)
371     {
372       int flag ;
373       MPI_Status status ;
374       const int root=0 ;
375       int msg ;
376       static bool eventSent=false ;
377
378       if (eventSent)
379       {
380         boost::hash<string> hashString;
381         size_t hashId = hashString("oasis_enddef");
382         if (CXios::getPoolRessource()->getService(CXios::defaultServerId,0)->getEventScheduler()->queryEvent(0,hashId))
383         {
384           CXios::getPoolRessource()->getService(CXios::defaultServerId,0)->getEventScheduler()->popEvent() ;
385           oasis_enddef() ;
386           eventSent=false ;
387         }
388       }
389         
390       traceOff() ;
391       MPI_Iprobe(root,5,intraComm_, &flag, &status) ;
392       traceOn() ;
393       if (flag==true)
394       {
395           MPI_Recv(&msg,1,MPI_INT,root,5,intraComm_,&status) ; // tags oasis_endded = 5
396           boost::hash<string> hashString;
397           size_t hashId = hashString("oasis_enddef");
398           CXios::getPoolRessource()->getService(CXios::defaultServerId,0)->getEventScheduler()->registerEvent(0,hashId);
399           eventSent=true ;
400       }
401     }
402
403    void CServer::finalize(void)
404    {
405      CTimer::get("XIOS").suspend() ;
406     
407      delete eventScheduler ;
408
409      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
410        MPI_Comm_free(&(*it));
411
412      for (std::list<MPI_Comm>::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++)
413        MPI_Comm_free(&(*it));
414
415        for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
416          MPI_Comm_free(&(*it));
417
418//      MPI_Comm_free(&intraComm);
419      CXios::finalizeDaemonsManager();
420      finalizeServersRessource();
421     
422      CContext::removeAllContexts() ; // free memory for related context
423         
424      CXios::getMpiGarbageCollector().release() ; // release unfree MPI ressources
425
426      if (!is_MPI_Initialized)
427      {
428        if (CXios::usingOasis) oasis_finalize();
429        else MPI_Finalize() ;
430      }
431      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
432      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
433      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
434      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
435     
436      CWorkflowGraph::drawWorkFlowGraph_server();
437      xios::releaseStaticAllocation() ; // free memory from static allocation
438    }
439
440    /*!
441    * Open a file specified by a suffix and an extension and use it for the given file buffer.
442    * The file name will be suffix+rank+extension.
443    *
444    * \param fileName[in] protype file name
445    * \param ext [in] extension of the file
446    * \param fb [in/out] the file buffer
447    */
448    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
449    {
450      StdStringStream fileNameServer;
451      int numDigit = 0;
452      int commSize = 0;
453      int commRank ;
454      int id;
455     
456      MPI_Comm_size(CXios::getGlobalComm(), &commSize);
457      MPI_Comm_rank(CXios::getGlobalComm(), &commRank);
458
459      while (commSize)
460      {
461        commSize /= 10;
462        ++numDigit;
463      }
464      id = commRank;
465
466      fileNameServer << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
467      fb->open(fileNameServer.str().c_str(), std::ios::out);
468      if (!fb->is_open())
469        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
470              << std::endl << "Can not open <" << fileNameServer.str() << "> file to write the server log(s).");
471    }
472
473    /*!
474    * \brief Open a file stream to write the info logs
475    * Open a file stream with a specific file name suffix+rank
476    * to write the info logs.
477    * \param fileName [in] protype file name
478    */
479    void CServer::openInfoStream(const StdString& fileName)
480    {
481      std::filebuf* fb = m_infoStream.rdbuf();
482      openStream(fileName, ".out", fb);
483
484      info.write2File(fb);
485      report.write2File(fb);
486    }
487
488    //! Write the info logs to standard output
489    void CServer::openInfoStream()
490    {
491      info.write2StdOut();
492      report.write2StdOut();
493    }
494
495    //! Close the info logs file if it opens
496    void CServer::closeInfoStream()
497    {
498      if (m_infoStream.is_open()) m_infoStream.close();
499    }
500
501    /*!
502    * \brief Open a file stream to write the error log
503    * Open a file stream with a specific file name suffix+rank
504    * to write the error log.
505    * \param fileName [in] protype file name
506    */
507    void CServer::openErrorStream(const StdString& fileName)
508    {
509      std::filebuf* fb = m_errorStream.rdbuf();
510      openStream(fileName, ".err", fb);
511
512      error.write2File(fb);
513    }
514
515    //! Write the error log to standard error output
516    void CServer::openErrorStream()
517    {
518      error.write2StdErr();
519    }
520
521    //! Close the error log file if it opens
522    void CServer::closeErrorStream()
523    {
524      if (m_errorStream.is_open()) m_errorStream.close();
525    }
526
527    void CServer::launchServersRessource(MPI_Comm serverComm)
528    {
529      serversRessource_ = new CServersRessource(serverComm) ;
530    }
531
532    void  CServer::finalizeServersRessource(void) 
533    { 
534      delete serversRessource_; serversRessource_=nullptr ;
535    }
536}
Note: See TracBrowser for help on using the repository browser.