source: XIOS/trunk/src/server.cpp @ 539

Last change on this file since 539 was 523, checked in by rlacroix, 9 years ago

Improve the message error handling by mimicking the behavior of the info/report logs.

Output the error messages to the standart error message until the context is correctly initialized. Then, output the error messages to a file if the user has set "print_file" parameter to "true".

  • Fix: Errors that occured before MPI was initialized (e.g. during the config file parsing) caused a MPI error on top of the original error.
  • Fix: The error file could sometimes be misnamed if the error happened before the context was completely known.
  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 13.4 KB
RevLine 
[490]1#include "globalScopeData.hpp"
[300]2#include "xmlioserver_spl.hpp"
3#include "cxios.hpp"
[342]4#include "server.hpp"
[300]5#include "type.hpp"
6#include "context.hpp"
[352]7#include "object_template.hpp"
[300]8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
[382]11#include "mpi.hpp"
[347]12#include "tracer.hpp"
13#include "timer.hpp"
[492]14#include "event_scheduler.hpp"
[300]15
[335]16namespace xios
[490]17{
[300]18    MPI_Comm CServer::intraComm ;
19    list<MPI_Comm> CServer::interComm ;
20    bool CServer::isRoot ;
[490]21    int CServer::rank = INVALID_RANK;
22    StdOFStream CServer::m_infoStream;
[523]23    StdOFStream CServer::m_errorStream;
[490]24    map<string,CContext*> CServer::contextList ;
[300]25    bool CServer::finished=false ;
26    bool CServer::is_MPI_Initialized ;
[492]27    CEventScheduler* CServer::eventScheduler ;
[490]28
[300]29    void CServer::initialize(void)
30    {
31      int initialized ;
32      MPI_Initialized(&initialized) ;
33      if (initialized) is_MPI_Initialized=true ;
34      else is_MPI_Initialized=false ;
[490]35
[300]36      // Not using OASIS
37      if (!CXios::usingOasis)
38      {
[490]39
40        if (!is_MPI_Initialized)
[300]41        {
42          int argc=0;
43          char** argv=NULL;
44          MPI_Init(&argc,&argv) ;
45        }
[359]46        CTimer::get("XIOS").resume() ;
[490]47
48        boost::hash<string> hashString ;
49
[300]50        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
51        unsigned long* hashAll ;
[490]52
53//        int rank ;
[300]54        int size ;
55        int myColor ;
56        int i,c ;
57        MPI_Comm newComm ;
[490]58
[300]59        MPI_Comm_size(CXios::globalComm,&size) ;
60        MPI_Comm_rank(CXios::globalComm,&rank);
61        hashAll=new unsigned long[size] ;
[490]62
[300]63        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
64
65        map<unsigned long, int> colors ;
66        map<unsigned long, int> leaders ;
67        map<unsigned long, int>::iterator it ;
[490]68
[300]69        for(i=0,c=0;i<size;i++)
70        {
71          if (colors.find(hashAll[i])==colors.end())
72          {
73            colors[hashAll[i]]=c ;
74            leaders[hashAll[i]]=i ;
75            c++ ;
76          }
77        }
[490]78
[300]79        myColor=colors[hashServer] ;
80        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
81
82        int serverLeader=leaders[hashServer] ;
83        int clientLeader;
[490]84
[300]85         serverLeader=leaders[hashServer] ;
86         for(it=leaders.begin();it!=leaders.end();it++)
87         {
88           if (it->first!=hashServer)
89           {
90             clientLeader=it->second ;
[492]91             int intraCommSize, intraCommRank ;
92             MPI_Comm_size(intraComm,&intraCommSize) ;
93             MPI_Comm_rank(intraComm,&intraCommRank) ;
[493]94             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
95                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
[490]96
[300]97             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
98             interComm.push_back(newComm) ;
99           }
100         }
101
102         delete [] hashAll ;
103      }
104      // using OASIS
105      else
106      {
[490]107//        int rank ,size;
108        int size;
109        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
110
[359]111        CTimer::get("XIOS").resume() ;
[300]112        oasis_get_localcomm(intraComm) ;
113        MPI_Comm_rank(intraComm,&rank) ;
114        MPI_Comm_size(intraComm,&size) ;
115        string codesId=CXios::getin<string>("oasis_codes_id") ;
[490]116
[300]117        vector<string> splitted ;
[483]118        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
[300]119        vector<string>::iterator it ;
120
121        MPI_Comm newComm ;
122        int globalRank ;
123        MPI_Comm_rank(CXios::globalComm,&globalRank);
[490]124
[300]125        for(it=splitted.begin();it!=splitted.end();it++)
126        {
127          oasis_get_intercomm(newComm,*it) ;
128          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
129          MPI_Comm_remote_size(newComm,&size);
130          interComm.push_back(newComm) ;
131        }
[492]132              oasis_enddef() ;
[300]133      }
[490]134
135//      int rank;
[300]136      MPI_Comm_rank(intraComm,&rank) ;
137      if (rank==0) isRoot=true;
[490]138      else isRoot=false;
[492]139     
140      eventScheduler = new CEventScheduler(intraComm) ;
[300]141    }
[490]142
[300]143    void CServer::finalize(void)
144    {
[361]145      CTimer::get("XIOS").suspend() ;
[492]146     
147      delete eventScheduler ;
148     
[300]149      if (!is_MPI_Initialized)
[490]150      {
[300]151        if (CXios::usingOasis) oasis_finalize();
152        else MPI_Finalize() ;
153      }
[347]154      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
155      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
156      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
[300]157    }
[490]158
[300]159     void CServer::eventLoop(void)
160     {
161       bool stop=false ;
[490]162
[347]163       CTimer::get("XIOS server").resume() ;
[300]164       while(!stop)
165       {
166         if (isRoot)
167         {
168           listenContext();
169           if (!finished) listenFinalize() ;
170         }
171         else
172         {
173           listenRootContext();
174           if (!finished) listenRootFinalize() ;
175         }
[490]176
[300]177         contextEventLoop() ;
178         if (finished && contextList.empty()) stop=true ;
[492]179         if (! CXios::isServer) eventScheduler->checkEvent() ;
[300]180       }
[347]181       CTimer::get("XIOS server").suspend() ;
[300]182     }
[490]183
[300]184     void CServer::listenFinalize(void)
185     {
186        list<MPI_Comm>::iterator it;
187        int msg ;
188        int flag ;
[490]189
[300]190        for(it=interComm.begin();it!=interComm.end();it++)
191        {
192           MPI_Status status ;
[347]193           traceOff() ;
[300]194           MPI_Iprobe(0,0,*it,&flag,&status) ;
[347]195           traceOn() ;
[300]196           if (flag==true)
197           {
198              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
199              info(20)<<" CServer : Receive client finalize"<<endl ;
200              interComm.erase(it) ;
201              break ;
202            }
203         }
[490]204
[300]205         if (interComm.empty())
206         {
207           int i,size ;
208           MPI_Comm_size(intraComm,&size) ;
209           MPI_Request* requests= new MPI_Request[size-1] ;
210           MPI_Status* status= new MPI_Status[size-1] ;
[490]211
[300]212           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
213           MPI_Waitall(size-1,requests,status) ;
214
215           finished=true ;
216           delete [] requests ;
217           delete [] status ;
218         }
219     }
[490]220
221
[300]222     void CServer::listenRootFinalize()
223     {
224        int flag ;
225        MPI_Status status ;
226        int msg ;
[490]227
[347]228        traceOff() ;
[300]229        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
[347]230        traceOn() ;
[300]231        if (flag==true)
232        {
233           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
234           finished=true ;
235        }
236      }
[490]237
[300]238     void CServer::listenContext(void)
239     {
[490]240
[300]241       MPI_Status status ;
242       int flag ;
243       static void* buffer ;
244       static MPI_Request request ;
245       static bool recept=false ;
246       int rank ;
247       int count ;
[490]248
[300]249       if (recept==false)
250       {
[347]251         traceOff() ;
[300]252         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
[347]253         traceOn() ;
[490]254         if (flag==true)
[300]255         {
256           rank=status.MPI_SOURCE ;
257           MPI_Get_count(&status,MPI_CHAR,&count) ;
258           buffer=new char[count] ;
259           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
[490]260           recept=true ;
[300]261         }
262       }
263       else
264       {
[347]265         traceOff() ;
[300]266         MPI_Test(&request,&flag,&status) ;
[347]267         traceOn() ;
[300]268         if (flag==true)
269         {
270           rank=status.MPI_SOURCE ;
271           MPI_Get_count(&status,MPI_CHAR,&count) ;
272           recvContextMessage(buffer,count) ;
273           delete [] buffer ;
[490]274           recept=false ;
[300]275         }
276       }
277     }
[490]278
[300]279     void CServer::recvContextMessage(void* buff,int count)
280     {
281       static map<string,contextMessage> recvContextId ;
282       map<string,contextMessage>::iterator it ;
[490]283
[300]284       CBufferIn buffer(buff,count) ;
285       string id ;
286       int clientLeader ;
287       int nbMessage ;
288
289       buffer>>id>>nbMessage>>clientLeader ;
[490]290
[300]291       it=recvContextId.find(id) ;
292       if (it==recvContextId.end())
[490]293       {
[300]294         contextMessage msg={0,0} ;
295         pair<map<string,contextMessage>::iterator,bool> ret ;
296         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
297         it=ret.first ;
[490]298       }
[300]299       it->second.nbRecv+=1 ;
300       it->second.leaderRank+=clientLeader ;
[490]301
[300]302       if (it->second.nbRecv==nbMessage)
[490]303       {
[300]304         int size ;
305         MPI_Comm_size(intraComm,&size) ;
306         MPI_Request* requests= new MPI_Request[size-1] ;
307         MPI_Status* status= new MPI_Status[size-1] ;
[490]308
[300]309         for(int i=1;i<size;i++)
310         {
311            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
312         }
313         MPI_Waitall(size-1,requests,status) ;
314         registerContext(buff,count,it->second.leaderRank) ;
315
316         recvContextId.erase(it) ;
317         delete [] requests ;
318         delete [] status ;
319
320       }
[490]321     }
322
[300]323     void CServer::listenRootContext(void)
324     {
[490]325
[300]326       MPI_Status status ;
327       int flag ;
328       static void* buffer ;
329       static MPI_Request request ;
330       static bool recept=false ;
331       int rank ;
332       int count ;
333       const int root=0 ;
[490]334
[300]335       if (recept==false)
336       {
[347]337         traceOff() ;
[300]338         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
[347]339         traceOn() ;
[490]340         if (flag==true)
[300]341         {
342           MPI_Get_count(&status,MPI_CHAR,&count) ;
343           buffer=new char[count] ;
344           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
[490]345           recept=true ;
[300]346         }
347       }
348       else
349       {
350         MPI_Test(&request,&flag,&status) ;
351         if (flag==true)
352         {
353           MPI_Get_count(&status,MPI_CHAR,&count) ;
354           registerContext(buffer,count) ;
355           delete [] buffer ;
[490]356           recept=false ;
[300]357         }
358       }
[490]359     }
360
361
362
[300]363     void CServer::registerContext(void* buff,int count, int leaderRank)
364     {
[490]365
[300]366       string contextId;
367       CBufferIn buffer(buff,count) ;
368
369       buffer>>contextId ;
370       MPI_Comm contextIntercomm ;
371       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
[490]372
[300]373       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
374       MPI_Comm inter ;
375       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
376       MPI_Barrier(inter) ;
[490]377       if (contextList.find(contextId)!=contextList.end())
[300]378        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
379              <<"Context has already been registred") ;
[490]380
[347]381      CContext* context=CContext::create(contextId) ;
382      contextList[contextId]=context ;
[300]383      context->initServer(intraComm,contextIntercomm) ;
[490]384
385     }
386
387
[300]388     void CServer::contextEventLoop(void)
389     {
390       bool finished ;
391       map<string,CContext*>::iterator it ;
[490]392       for(it=contextList.begin();it!=contextList.end();it++)
[300]393       {
394         finished=it->second->eventLoop() ;
395         if (finished)
396         {
397           contextList.erase(it) ;
398           break ;
399         }
400       }
[490]401
[300]402     }
[490]403
404     //! Get rank of the current process
405     int CServer::getRank()
406     {
407       return rank;
408     }
409
[523]410    /*!
411    * Open a file specified by a suffix and an extension and use it for the given file buffer.
412    * The file name will be suffix+rank+extension.
413    *
414    * \param fileName[in] protype file name
415    * \param ext [in] extension of the file
416    * \param fb [in/out] the file buffer
417    */
418    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
419    {
420      StdStringStream fileNameClient;
421      int numDigit = 0;
422      int size = 0;
423      MPI_Comm_size(CXios::globalComm, &size);
424      while (size)
425      {
426        size /= 10;
427        ++numDigit;
428      }
[497]429
[523]430      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
431      fb->open(fileNameClient.str().c_str(), std::ios::out);
432      if (!fb->is_open())
433        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
434              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
435    }
[490]436
[523]437    /*!
438    * \brief Open a file stream to write the info logs
439    * Open a file stream with a specific file name suffix+rank
440    * to write the info logs.
441    * \param fileName [in] protype file name
442    */
443    void CServer::openInfoStream(const StdString& fileName)
444    {
445      std::filebuf* fb = m_infoStream.rdbuf();
446      openStream(fileName, ".out", fb);
[490]447
[523]448      info.write2File(fb);
449      report.write2File(fb);
450    }
[490]451
[523]452    //! Write the info logs to standard output
453    void CServer::openInfoStream()
454    {
455      info.write2StdOut();
456      report.write2StdOut();
457    }
[490]458
[523]459    //! Close the info logs file if it opens
460    void CServer::closeInfoStream()
461    {
462      if (m_infoStream.is_open()) m_infoStream.close();
463    }
464
465    /*!
466    * \brief Open a file stream to write the error log
467    * Open a file stream with a specific file name suffix+rank
468    * to write the error log.
469    * \param fileName [in] protype file name
470    */
471    void CServer::openErrorStream(const StdString& fileName)
472    {
473      std::filebuf* fb = m_errorStream.rdbuf();
474      openStream(fileName, ".err", fb);
475
476      error.write2File(fb);
477    }
478
479    //! Write the error log to standard error output
480    void CServer::openErrorStream()
481    {
482      error.write2StdErr();
483    }
484
485    //! Close the error log file if it opens
486    void CServer::closeErrorStream()
487    {
488      if (m_errorStream.is_open()) m_errorStream.close();
489    }
[300]490}
Note: See TracBrowser for help on using the repository browser.