source: XIOS/dev/branch_yushan/src/server.cpp @ 1067

Last change on this file since 1067 was 1067, checked in by yushan, 8 years ago

server mode OK tested with test_complete

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 14.6 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "type.hpp"
6#include "context.hpp"
7#include "object_template.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11//#include "mpi.hpp"
12#include "tracer.hpp"
13#include "timer.hpp"
14#include "event_scheduler.hpp"
15
16namespace xios
17{
18    MPI_Comm CServer::intraComm ;
19    list<MPI_Comm> CServer::interComm ;
20    std::list<MPI_Comm> CServer::contextInterComms;
21    bool CServer::isRoot ;
22    int CServer::rank = INVALID_RANK;
23    StdOFStream CServer::m_infoStream;
24    StdOFStream CServer::m_errorStream;
25    map<string,CContext*> CServer::contextList ;
26    bool CServer::finished=false ;
27    bool CServer::is_MPI_Initialized ;
28
29   
30    CEventScheduler* CServer::eventScheduler = 0;
31   
32    void CServer::initialize(void)
33    {
34      // int initialized ;
35      // MPI_Initialized(&initialized) ;
36      // if (initialized) is_MPI_Initialized=true ;
37      // else is_MPI_Initialized=false ;
38
39      // Not using OASIS
40      if (!CXios::usingOasis)
41      {
42        // if (!is_MPI_Initialized)
43        // {
44        //   MPI_Init(NULL, NULL);
45        // }
46
47        CTimer::get("XIOS").resume() ;
48
49        boost::hash<string> hashString ;
50
51        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
52        unsigned long* hashAll ;
53
54
55        int size ;
56        int myColor ;
57        int i,c ;
58        MPI_Comm newComm ;
59
60        MPI_Comm_size(CXios::globalComm,&size) ;
61        MPI_Comm_rank(CXios::globalComm,&rank);
62        hashAll=new unsigned long[size] ;
63
64        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
65
66        map<unsigned long, int> colors ;
67        map<unsigned long, int> leaders ;
68        map<unsigned long, int>::iterator it ;
69
70        for(i=0,c=0;i<size;i++)
71        {
72          if (colors.find(hashAll[i])==colors.end())
73          {
74            colors[hashAll[i]]=c ;
75            leaders[hashAll[i]]=i ;
76            c++ ;
77          }
78        }
79
80        myColor=colors[hashServer] ;
81
82
83        MPI_Comm_split(CXios::globalComm,myColor,rank,&intraComm) ;
84
85       
86        int serverLeader=leaders[hashServer] ;
87        int clientLeader;
88
89         serverLeader=leaders[hashServer] ;
90         for(it=leaders.begin();it!=leaders.end();++it)
91         {
92           if (it->first!=hashServer)
93           {
94             clientLeader=it->second ;
95             int intraCommSize, intraCommRank ;
96             MPI_Comm_size(intraComm,&intraCommSize) ;
97             MPI_Comm_rank(intraComm,&intraCommRank) ;
98             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
99                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
100
101             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
102             interComm.push_back(newComm) ;
103             printf("after inter create, interComm.size = %lu\n", interComm.size());
104           }
105         }
106
107         delete [] hashAll ;
108      }
109      // using OASIS
110      else
111      {
112        int size;
113        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
114
115        CTimer::get("XIOS").resume() ;
116        MPI_Comm localComm;
117        oasis_get_localcomm(localComm);
118        MPI_Comm_dup(localComm, &intraComm);
119
120        MPI_Comm_rank(intraComm,&rank) ;
121        MPI_Comm_size(intraComm,&size) ;
122        string codesId=CXios::getin<string>("oasis_codes_id") ;
123
124        vector<string> splitted ;
125        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
126        vector<string>::iterator it ;
127
128        MPI_Comm newComm ;
129        int globalRank ;
130        MPI_Comm_rank(CXios::globalComm,&globalRank);
131
132        for(it=splitted.begin();it!=splitted.end();it++)
133        {
134          oasis_get_intercomm(newComm,*it) ;
135          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
136          MPI_Comm_remote_size(newComm,&size);
137          interComm.push_back(newComm) ;
138        }
139              oasis_enddef() ;
140      }
141
142      MPI_Comm_rank(intraComm,&rank) ;
143      if (rank==0) isRoot=true;
144      else isRoot=false;
145     
146      eventScheduler = new CEventScheduler(intraComm) ;
147    }
148
149    void CServer::finalize(void)
150    {
151      CTimer::get("XIOS").suspend() ;
152     
153      delete eventScheduler ;
154     
155     
156
157      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); ++it)
158        MPI_Comm_free(&(*it));
159
160      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); ++it)
161        MPI_Comm_free(&(*it));
162
163      MPI_Comm_free(&intraComm);
164
165      if (!is_MPI_Initialized)
166      {
167        if (CXios::usingOasis) oasis_finalize();
168        //else  {MPI_Finalize() ; printf("CServer::finalize called MPI_finalize\n");}
169      }
170
171     
172      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
173      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
174      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
175    }
176
177     void CServer::eventLoop(void)
178     {
179       bool stop=false ;
180
181       CTimer::get("XIOS server").resume() ;
182       while(!stop)
183       {
184         if (isRoot)
185         {
186           listenContext(); 
187           if (!finished) listenFinalize() ; 
188         }
189         else
190         {
191           listenRootContext(); 
192           if (!finished) 
193           {
194             listenRootFinalize() ; 
195           }
196         }
197         
198         contextEventLoop() ;
199         if (finished && contextList.empty()) stop=true ;
200         
201         eventScheduler->checkEvent() ;
202       }
203       
204       
205       CTimer::get("XIOS server").suspend() ;
206     }
207
208     void CServer::listenFinalize(void)
209     {
210        list<MPI_Comm>::iterator it;
211        int msg ;
212        int flag ;
213       
214
215        for(it=interComm.begin();it!=interComm.end();++it)
216        {
217           MPI_Status status ;
218           traceOff() ;
219           MPI_Iprobe(0,0,*it,&flag,&status) ;
220           traceOn() ;
221           if (flag==true)
222           {
223              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
224              info(20)<<" CServer : Receive client finalize"<<endl ;
225
226              MPI_Comm_free(&(*it));
227              interComm.erase(it) ;
228              break ;
229            }
230         }
231
232         if (interComm.empty())
233         {
234           int i,size ;
235           MPI_Comm_size(intraComm,&size) ;
236           MPI_Request* requests= new MPI_Request[size-1] ;
237           MPI_Status* status= new MPI_Status[size-1] ;
238
239           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
240           MPI_Waitall(size-1,requests,status) ;
241
242           finished=true ;
243           delete [] requests ;
244           delete [] status ;
245         }
246     }
247
248
249     void CServer::listenRootFinalize()
250     {
251        int flag ;
252        MPI_Status status ;
253        int msg ;
254       
255        traceOff() ;
256        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
257        traceOn() ;
258        if (flag==true)
259        {
260           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
261           finished=true ;
262        }
263      }
264
265     void CServer::listenContext(void)
266     {
267
268       MPI_Status status ;
269       int flag = false ;
270       static void* buffer ;
271       static MPI_Request request ;
272       static bool recept=false ;
273       int rank ;
274       int count ; 
275
276       if (recept==false)
277       {     
278         traceOff() ;
279         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
280         traceOn() ;
281         
282         if (flag==true)
283         {
284           #ifdef _usingMPI
285           rank=status.MPI_SOURCE ;
286           #elif _usingEP
287           rank= status.ep_src ;
288           #endif
289           MPI_Get_count(&status,MPI_CHAR,&count) ;
290           buffer=new char[count] ;
291           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
292           recept=true ;
293         }
294         
295       }
296       else
297       {
298         traceOff() ;
299         MPI_Test(&request,&flag,&status) ;
300         traceOn() ;
301         if (flag==true)
302         {
303           #ifdef _usingMPI
304           rank=status.MPI_SOURCE ;
305           #elif _usingEP
306           rank= status.ep_src ;
307           #endif
308           MPI_Get_count(&status,MPI_CHAR,&count) ;
309           recvContextMessage(buffer,count) ;
310           
311           delete [] buffer ;
312           recept=false ;
313         }
314       }
315     }
316
317     void CServer::recvContextMessage(void* buff,int count)
318     {
319       static map<string,contextMessage> recvContextId ;
320       map<string,contextMessage>::iterator it ;
321
322       CBufferIn buffer(buff,count) ;
323       string id ;
324       int clientLeader ;
325       int nbMessage ;
326
327       buffer>>id>>nbMessage>>clientLeader ;
328
329       it=recvContextId.find(id) ;
330       if (it==recvContextId.end())
331       {
332         contextMessage msg={0,0} ;
333         pair<map<string,contextMessage>::iterator,bool> ret ;
334         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
335         it=ret.first ;
336       }
337       it->second.nbRecv+=1 ;
338       it->second.leaderRank+=clientLeader ;
339
340       if (it->second.nbRecv==nbMessage)
341       {
342         int size ;
343         MPI_Comm_size(intraComm,&size) ;
344         MPI_Request* requests= new MPI_Request[size-1] ;
345         MPI_Status* status= new MPI_Status[size-1] ;
346
347         for(int i=1;i<size;i++)
348         {
349            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
350         }
351         MPI_Waitall(size-1,requests,status) ;
352         registerContext(buff,count,it->second.leaderRank) ;
353
354         recvContextId.erase(it) ;
355         delete [] requests ;
356         delete [] status ;
357
358       }
359     }
360
361     void CServer::listenRootContext(void)
362     {
363
364       MPI_Status status ;
365       int flag ;
366       static void* buffer ;
367       static MPI_Request request ;
368       static bool recept=false ;
369       int rank ;
370       int count ;
371       const int root=0 ;
372       
373
374       if (recept==false)
375       {
376         traceOff() ;
377         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
378         traceOn() ;
379         if (flag==true)
380         {
381           MPI_Get_count(&status,MPI_CHAR,&count) ;
382           buffer=new char[count] ;
383           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
384           recept=true ;
385         }
386       }
387       else
388       {
389         MPI_Test(&request,&flag,&status) ;
390         if (flag==true)
391         {
392           MPI_Get_count(&status,MPI_CHAR,&count) ;
393           registerContext(buffer,count) ;
394           delete [] buffer ;
395           recept=false ;
396         }
397       }
398     }
399
400     void CServer::registerContext(void* buff, int count, int leaderRank)
401     {
402       string contextId;
403       CBufferIn buffer(buff, count);
404       buffer >> contextId;
405
406       info(20) << "CServer : Register new Context : " << contextId << endl;
407
408       if (contextList.find(contextId) != contextList.end())
409         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
410               << "Context '" << contextId << "' has already been registred");
411
412       MPI_Comm contextIntercomm;
413       
414       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm);
415
416       MPI_Comm inter;
417       MPI_Intercomm_merge(contextIntercomm,1,&inter);
418       MPI_Barrier(inter);
419
420       info(20) << "CServer : MPI_Intercomm_merge and MPI_Barrier " << contextId << endl;
421       
422
423       CContext* context=CContext::create(contextId);
424       contextList[contextId]=context;
425       context->initServer(intraComm,contextIntercomm);
426       
427       
428
429       contextInterComms.push_back(contextIntercomm);
430       
431       
432       MPI_Comm_free(&inter);
433       
434       info(20) << "CServer : Register new Context OKOK " << contextId << endl;
435     }
436
437     void CServer::contextEventLoop(void)
438     {
439       bool finished ;
440       map<string,CContext*>::iterator it ;
441       for(it=contextList.begin();it!=contextList.end();++it)
442       {
443         finished=it->second->checkBuffersAndListen();
444         if (finished)
445         {
446           contextList.erase(it) ;
447           break ;
448         }
449       }
450
451     }
452
453     //! Get rank of the current process
454     int CServer::getRank()
455     {
456       return rank;
457     }
458
459    /*!
460    * Open a file specified by a suffix and an extension and use it for the given file buffer.
461    * The file name will be suffix+rank+extension.
462    *
463    * \param fileName[in] protype file name
464    * \param ext [in] extension of the file
465    * \param fb [in/out] the file buffer
466    */
467    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
468    {
469      StdStringStream fileNameClient;
470      int numDigit = 0;
471      int size = 0;
472      MPI_Comm_size(CXios::globalComm, &size);
473      while (size)
474      {
475        size /= 10;
476        ++numDigit;
477      }
478
479      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
480      fb->open(fileNameClient.str().c_str(), std::ios::out);
481      if (!fb->is_open())
482        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
483              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
484    }
485
486    /*!
487    * \brief Open a file stream to write the info logs
488    * Open a file stream with a specific file name suffix+rank
489    * to write the info logs.
490    * \param fileName [in] protype file name
491    */
492    void CServer::openInfoStream(const StdString& fileName)
493    {
494      std::filebuf* fb = m_infoStream.rdbuf();
495      openStream(fileName, ".out", fb);
496
497      info.write2File(fb);
498      report.write2File(fb);
499    }
500
501    //! Write the info logs to standard output
502    void CServer::openInfoStream()
503    {
504      info.write2StdOut();
505      report.write2StdOut();
506    }
507
508    //! Close the info logs file if it opens
509    void CServer::closeInfoStream()
510    {
511      if (m_infoStream.is_open()) m_infoStream.close();
512    }
513
514    /*!
515    * \brief Open a file stream to write the error log
516    * Open a file stream with a specific file name suffix+rank
517    * to write the error log.
518    * \param fileName [in] protype file name
519    */
520    void CServer::openErrorStream(const StdString& fileName)
521    {
522      std::filebuf* fb = m_errorStream.rdbuf();
523      openStream(fileName, ".err", fb);
524
525      error.write2File(fb);
526    }
527
528    //! Write the error log to standard error output
529    void CServer::openErrorStream()
530    {
531      error.write2StdErr();
532    }
533
534    //! Close the error log file if it opens
535    void CServer::closeErrorStream()
536    {
537      if (m_errorStream.is_open()) m_errorStream.close();
538    }
539}
Note: See TracBrowser for help on using the repository browser.