source: XIOS/trunk/src/server.cpp @ 539

Last change on this file since 539 was 523, checked in by rlacroix, 9 years ago

Improve the message error handling by mimicking the behavior of the info/report logs.

Output the error messages to the standart error message until the context is correctly initialized. Then, output the error messages to a file if the user has set "print_file" parameter to "true".

  • Fix: Errors that occured before MPI was initialized (e.g. during the config file parsing) caused a MPI error on top of the original error.
  • Fix: The error file could sometimes be misnamed if the error happened before the context was completely known.
  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 13.4 KB
Line 
1#include "globalScopeData.hpp"
2#include "xmlioserver_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "type.hpp"
6#include "context.hpp"
7#include "object_template.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11#include "mpi.hpp"
12#include "tracer.hpp"
13#include "timer.hpp"
14#include "event_scheduler.hpp"
15
16namespace xios
17{
18    MPI_Comm CServer::intraComm ;
19    list<MPI_Comm> CServer::interComm ;
20    bool CServer::isRoot ;
21    int CServer::rank = INVALID_RANK;
22    StdOFStream CServer::m_infoStream;
23    StdOFStream CServer::m_errorStream;
24    map<string,CContext*> CServer::contextList ;
25    bool CServer::finished=false ;
26    bool CServer::is_MPI_Initialized ;
27    CEventScheduler* CServer::eventScheduler ;
28
29    void CServer::initialize(void)
30    {
31      int initialized ;
32      MPI_Initialized(&initialized) ;
33      if (initialized) is_MPI_Initialized=true ;
34      else is_MPI_Initialized=false ;
35
36      // Not using OASIS
37      if (!CXios::usingOasis)
38      {
39
40        if (!is_MPI_Initialized)
41        {
42          int argc=0;
43          char** argv=NULL;
44          MPI_Init(&argc,&argv) ;
45        }
46        CTimer::get("XIOS").resume() ;
47
48        boost::hash<string> hashString ;
49
50        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
51        unsigned long* hashAll ;
52
53//        int rank ;
54        int size ;
55        int myColor ;
56        int i,c ;
57        MPI_Comm newComm ;
58
59        MPI_Comm_size(CXios::globalComm,&size) ;
60        MPI_Comm_rank(CXios::globalComm,&rank);
61        hashAll=new unsigned long[size] ;
62
63        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
64
65        map<unsigned long, int> colors ;
66        map<unsigned long, int> leaders ;
67        map<unsigned long, int>::iterator it ;
68
69        for(i=0,c=0;i<size;i++)
70        {
71          if (colors.find(hashAll[i])==colors.end())
72          {
73            colors[hashAll[i]]=c ;
74            leaders[hashAll[i]]=i ;
75            c++ ;
76          }
77        }
78
79        myColor=colors[hashServer] ;
80        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
81
82        int serverLeader=leaders[hashServer] ;
83        int clientLeader;
84
85         serverLeader=leaders[hashServer] ;
86         for(it=leaders.begin();it!=leaders.end();it++)
87         {
88           if (it->first!=hashServer)
89           {
90             clientLeader=it->second ;
91             int intraCommSize, intraCommRank ;
92             MPI_Comm_size(intraComm,&intraCommSize) ;
93             MPI_Comm_rank(intraComm,&intraCommRank) ;
94             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
95                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
96
97             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
98             interComm.push_back(newComm) ;
99           }
100         }
101
102         delete [] hashAll ;
103      }
104      // using OASIS
105      else
106      {
107//        int rank ,size;
108        int size;
109        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
110
111        CTimer::get("XIOS").resume() ;
112        oasis_get_localcomm(intraComm) ;
113        MPI_Comm_rank(intraComm,&rank) ;
114        MPI_Comm_size(intraComm,&size) ;
115        string codesId=CXios::getin<string>("oasis_codes_id") ;
116
117        vector<string> splitted ;
118        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
119        vector<string>::iterator it ;
120
121        MPI_Comm newComm ;
122        int globalRank ;
123        MPI_Comm_rank(CXios::globalComm,&globalRank);
124
125        for(it=splitted.begin();it!=splitted.end();it++)
126        {
127          oasis_get_intercomm(newComm,*it) ;
128          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
129          MPI_Comm_remote_size(newComm,&size);
130          interComm.push_back(newComm) ;
131        }
132              oasis_enddef() ;
133      }
134
135//      int rank;
136      MPI_Comm_rank(intraComm,&rank) ;
137      if (rank==0) isRoot=true;
138      else isRoot=false;
139     
140      eventScheduler = new CEventScheduler(intraComm) ;
141    }
142
143    void CServer::finalize(void)
144    {
145      CTimer::get("XIOS").suspend() ;
146     
147      delete eventScheduler ;
148     
149      if (!is_MPI_Initialized)
150      {
151        if (CXios::usingOasis) oasis_finalize();
152        else MPI_Finalize() ;
153      }
154      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
155      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
156      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
157    }
158
159     void CServer::eventLoop(void)
160     {
161       bool stop=false ;
162
163       CTimer::get("XIOS server").resume() ;
164       while(!stop)
165       {
166         if (isRoot)
167         {
168           listenContext();
169           if (!finished) listenFinalize() ;
170         }
171         else
172         {
173           listenRootContext();
174           if (!finished) listenRootFinalize() ;
175         }
176
177         contextEventLoop() ;
178         if (finished && contextList.empty()) stop=true ;
179         if (! CXios::isServer) eventScheduler->checkEvent() ;
180       }
181       CTimer::get("XIOS server").suspend() ;
182     }
183
184     void CServer::listenFinalize(void)
185     {
186        list<MPI_Comm>::iterator it;
187        int msg ;
188        int flag ;
189
190        for(it=interComm.begin();it!=interComm.end();it++)
191        {
192           MPI_Status status ;
193           traceOff() ;
194           MPI_Iprobe(0,0,*it,&flag,&status) ;
195           traceOn() ;
196           if (flag==true)
197           {
198              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
199              info(20)<<" CServer : Receive client finalize"<<endl ;
200              interComm.erase(it) ;
201              break ;
202            }
203         }
204
205         if (interComm.empty())
206         {
207           int i,size ;
208           MPI_Comm_size(intraComm,&size) ;
209           MPI_Request* requests= new MPI_Request[size-1] ;
210           MPI_Status* status= new MPI_Status[size-1] ;
211
212           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
213           MPI_Waitall(size-1,requests,status) ;
214
215           finished=true ;
216           delete [] requests ;
217           delete [] status ;
218         }
219     }
220
221
222     void CServer::listenRootFinalize()
223     {
224        int flag ;
225        MPI_Status status ;
226        int msg ;
227
228        traceOff() ;
229        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
230        traceOn() ;
231        if (flag==true)
232        {
233           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
234           finished=true ;
235        }
236      }
237
238     void CServer::listenContext(void)
239     {
240
241       MPI_Status status ;
242       int flag ;
243       static void* buffer ;
244       static MPI_Request request ;
245       static bool recept=false ;
246       int rank ;
247       int count ;
248
249       if (recept==false)
250       {
251         traceOff() ;
252         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
253         traceOn() ;
254         if (flag==true)
255         {
256           rank=status.MPI_SOURCE ;
257           MPI_Get_count(&status,MPI_CHAR,&count) ;
258           buffer=new char[count] ;
259           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
260           recept=true ;
261         }
262       }
263       else
264       {
265         traceOff() ;
266         MPI_Test(&request,&flag,&status) ;
267         traceOn() ;
268         if (flag==true)
269         {
270           rank=status.MPI_SOURCE ;
271           MPI_Get_count(&status,MPI_CHAR,&count) ;
272           recvContextMessage(buffer,count) ;
273           delete [] buffer ;
274           recept=false ;
275         }
276       }
277     }
278
279     void CServer::recvContextMessage(void* buff,int count)
280     {
281       static map<string,contextMessage> recvContextId ;
282       map<string,contextMessage>::iterator it ;
283
284       CBufferIn buffer(buff,count) ;
285       string id ;
286       int clientLeader ;
287       int nbMessage ;
288
289       buffer>>id>>nbMessage>>clientLeader ;
290
291       it=recvContextId.find(id) ;
292       if (it==recvContextId.end())
293       {
294         contextMessage msg={0,0} ;
295         pair<map<string,contextMessage>::iterator,bool> ret ;
296         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
297         it=ret.first ;
298       }
299       it->second.nbRecv+=1 ;
300       it->second.leaderRank+=clientLeader ;
301
302       if (it->second.nbRecv==nbMessage)
303       {
304         int size ;
305         MPI_Comm_size(intraComm,&size) ;
306         MPI_Request* requests= new MPI_Request[size-1] ;
307         MPI_Status* status= new MPI_Status[size-1] ;
308
309         for(int i=1;i<size;i++)
310         {
311            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
312         }
313         MPI_Waitall(size-1,requests,status) ;
314         registerContext(buff,count,it->second.leaderRank) ;
315
316         recvContextId.erase(it) ;
317         delete [] requests ;
318         delete [] status ;
319
320       }
321     }
322
323     void CServer::listenRootContext(void)
324     {
325
326       MPI_Status status ;
327       int flag ;
328       static void* buffer ;
329       static MPI_Request request ;
330       static bool recept=false ;
331       int rank ;
332       int count ;
333       const int root=0 ;
334
335       if (recept==false)
336       {
337         traceOff() ;
338         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
339         traceOn() ;
340         if (flag==true)
341         {
342           MPI_Get_count(&status,MPI_CHAR,&count) ;
343           buffer=new char[count] ;
344           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
345           recept=true ;
346         }
347       }
348       else
349       {
350         MPI_Test(&request,&flag,&status) ;
351         if (flag==true)
352         {
353           MPI_Get_count(&status,MPI_CHAR,&count) ;
354           registerContext(buffer,count) ;
355           delete [] buffer ;
356           recept=false ;
357         }
358       }
359     }
360
361
362
363     void CServer::registerContext(void* buff,int count, int leaderRank)
364     {
365
366       string contextId;
367       CBufferIn buffer(buff,count) ;
368
369       buffer>>contextId ;
370       MPI_Comm contextIntercomm ;
371       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
372
373       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
374       MPI_Comm inter ;
375       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
376       MPI_Barrier(inter) ;
377       if (contextList.find(contextId)!=contextList.end())
378        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
379              <<"Context has already been registred") ;
380
381      CContext* context=CContext::create(contextId) ;
382      contextList[contextId]=context ;
383      context->initServer(intraComm,contextIntercomm) ;
384
385     }
386
387
388     void CServer::contextEventLoop(void)
389     {
390       bool finished ;
391       map<string,CContext*>::iterator it ;
392       for(it=contextList.begin();it!=contextList.end();it++)
393       {
394         finished=it->second->eventLoop() ;
395         if (finished)
396         {
397           contextList.erase(it) ;
398           break ;
399         }
400       }
401
402     }
403
404     //! Get rank of the current process
405     int CServer::getRank()
406     {
407       return rank;
408     }
409
410    /*!
411    * Open a file specified by a suffix and an extension and use it for the given file buffer.
412    * The file name will be suffix+rank+extension.
413    *
414    * \param fileName[in] protype file name
415    * \param ext [in] extension of the file
416    * \param fb [in/out] the file buffer
417    */
418    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
419    {
420      StdStringStream fileNameClient;
421      int numDigit = 0;
422      int size = 0;
423      MPI_Comm_size(CXios::globalComm, &size);
424      while (size)
425      {
426        size /= 10;
427        ++numDigit;
428      }
429
430      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
431      fb->open(fileNameClient.str().c_str(), std::ios::out);
432      if (!fb->is_open())
433        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
434              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
435    }
436
437    /*!
438    * \brief Open a file stream to write the info logs
439    * Open a file stream with a specific file name suffix+rank
440    * to write the info logs.
441    * \param fileName [in] protype file name
442    */
443    void CServer::openInfoStream(const StdString& fileName)
444    {
445      std::filebuf* fb = m_infoStream.rdbuf();
446      openStream(fileName, ".out", fb);
447
448      info.write2File(fb);
449      report.write2File(fb);
450    }
451
452    //! Write the info logs to standard output
453    void CServer::openInfoStream()
454    {
455      info.write2StdOut();
456      report.write2StdOut();
457    }
458
459    //! Close the info logs file if it opens
460    void CServer::closeInfoStream()
461    {
462      if (m_infoStream.is_open()) m_infoStream.close();
463    }
464
465    /*!
466    * \brief Open a file stream to write the error log
467    * Open a file stream with a specific file name suffix+rank
468    * to write the error log.
469    * \param fileName [in] protype file name
470    */
471    void CServer::openErrorStream(const StdString& fileName)
472    {
473      std::filebuf* fb = m_errorStream.rdbuf();
474      openStream(fileName, ".err", fb);
475
476      error.write2File(fb);
477    }
478
479    //! Write the error log to standard error output
480    void CServer::openErrorStream()
481    {
482      error.write2StdErr();
483    }
484
485    //! Close the error log file if it opens
486    void CServer::closeErrorStream()
487    {
488      if (m_errorStream.is_open()) m_errorStream.close();
489    }
490}
Note: See TracBrowser for help on using the repository browser.