source: XIOS/dev/branch_yushan/src/server.cpp @ 1037

Last change on this file since 1037 was 1037, checked in by yushan, 7 years ago

initialize the branch

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 14.6 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "type.hpp"
6#include "context.hpp"
7#include "object_template.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11//#include "mpi.hpp"
12#include "tracer.hpp"
13#include "timer.hpp"
14#include "event_scheduler.hpp"
15
16namespace xios
17{
18    MPI_Comm CServer::intraComm ;
19    list<MPI_Comm> CServer::interComm ;
20    std::list<MPI_Comm> CServer::contextInterComms;
21    bool CServer::isRoot ;
22    int CServer::rank = INVALID_RANK;
23    StdOFStream CServer::m_infoStream;
24    StdOFStream CServer::m_errorStream;
25    map<string,CContext*> CServer::contextList ;
26    bool CServer::finished=false ;
27    bool CServer::is_MPI_Initialized ;
28
29   
30    CEventScheduler* CServer::eventScheduler = 0;
31   
32    void CServer::initialize(void)
33    {
34      // Not using OASIS
35      if (!CXios::usingOasis)
36      {
37
38        CTimer::get("XIOS").resume() ;
39
40        boost::hash<string> hashString ;
41
42        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
43        unsigned long* hashAll ;
44
45
46        int size ;
47        int myColor ;
48        int i,c ;
49        MPI_Comm newComm ;
50
51        MPI_Comm_size(CXios::globalComm,&size) ;
52        MPI_Comm_rank(CXios::globalComm,&rank);
53        hashAll=new unsigned long[size] ;
54
55        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
56
57        map<unsigned long, int> colors ;
58        map<unsigned long, int> leaders ;
59        map<unsigned long, int>::iterator it ;
60
61        for(i=0,c=0;i<size;i++)
62        {
63          if (colors.find(hashAll[i])==colors.end())
64          {
65            colors[hashAll[i]]=c ;
66            leaders[hashAll[i]]=i ;
67            c++ ;
68          }
69        }
70
71        myColor=colors[hashServer] ;
72
73
74        MPI_Comm_split(CXios::globalComm,myColor,rank,&intraComm) ;
75
76       
77        int serverLeader=leaders[hashServer] ;
78        int clientLeader;
79
80         serverLeader=leaders[hashServer] ;
81         for(it=leaders.begin();it!=leaders.end();it++)
82         {
83           if (it->first!=hashServer)
84           {
85             clientLeader=it->second ;
86             int intraCommSize, intraCommRank ;
87             MPI_Comm_size(intraComm,&intraCommSize) ;
88             MPI_Comm_rank(intraComm,&intraCommRank) ;
89             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
90                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
91
92             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
93             interComm.push_back(newComm) ;
94             printf("after inter create, interComm.size = %lu\n", interComm.size());
95           }
96         }
97
98         delete [] hashAll ;
99      }
100      // using OASIS
101      else
102      {
103        int size;
104        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
105
106        CTimer::get("XIOS").resume() ;
107        MPI_Comm localComm;
108        oasis_get_localcomm(localComm);
109        MPI_Comm_dup(localComm, &intraComm);
110
111        MPI_Comm_rank(intraComm,&rank) ;
112        MPI_Comm_size(intraComm,&size) ;
113        string codesId=CXios::getin<string>("oasis_codes_id") ;
114
115        vector<string> splitted ;
116        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
117        vector<string>::iterator it ;
118
119        MPI_Comm newComm ;
120        int globalRank ;
121        MPI_Comm_rank(CXios::globalComm,&globalRank);
122
123        for(it=splitted.begin();it!=splitted.end();it++)
124        {
125          oasis_get_intercomm(newComm,*it) ;
126          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
127          MPI_Comm_remote_size(newComm,&size);
128          interComm.push_back(newComm) ;
129        }
130              oasis_enddef() ;
131      }
132
133//      int rank;
134      MPI_Comm_rank(intraComm,&rank) ;
135      if (rank==0) isRoot=true;
136      else isRoot=false;
137     
138      eventScheduler = new CEventScheduler(intraComm) ;
139    }
140
141    void CServer::finalize(void)
142    {
143      CTimer::get("XIOS").suspend() ;
144     
145      delete eventScheduler ;
146     
147     
148
149      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
150        MPI_Comm_free(&(*it));
151
152      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)
153        MPI_Comm_free(&(*it));
154
155      MPI_Comm_free(&intraComm);
156
157      if (!is_MPI_Initialized)
158      {
159        if (CXios::usingOasis) oasis_finalize();
160        else  {MPI_Finalize() ; printf("CServer::finalize called MPI_finalize\n");}
161      }
162
163     
164      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
165      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
166      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
167    }
168
169     void CServer::eventLoop(void)
170     {
171       bool stop=false ;
172
173       CTimer::get("XIOS server").resume() ;
174       while(!stop)
175       {
176         if (isRoot)
177         {
178           listenContext(); 
179           if (!finished) listenFinalize() ; 
180         }
181         else
182         {
183           listenRootContext(); 
184           if (!finished) 
185           {
186             listenRootFinalize() ; 
187           }
188         }
189         
190         contextEventLoop() ;
191         if (finished && contextList.empty()) stop=true ;
192         
193         eventScheduler->checkEvent() ;
194       }
195       
196       
197       CTimer::get("XIOS server").suspend() ;
198     }
199
200     void CServer::listenFinalize(void)
201     {
202        list<MPI_Comm>::iterator it;
203        int msg ;
204        int flag ;
205       
206
207        for(it=interComm.begin();it!=interComm.end();it++)
208        {
209           MPI_Status status ;
210           traceOff() ;
211           MPI_Iprobe(0,0,*it,&flag,&status) ;
212           traceOn() ;
213           if (flag==true)
214           {
215              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
216              printf(" CServer : Receive client finalize\n");
217              info(20)<<" CServer : Receive client finalize"<<endl ;
218
219              MPI_Comm_free(&(*it));
220              interComm.erase(it) ;
221              break ;
222            }
223         }
224
225         if (interComm.empty())
226         {
227           int i,size ;
228           MPI_Comm_size(intraComm,&size) ;
229           MPI_Request* requests= new MPI_Request[size-1] ;
230           MPI_Status* status= new MPI_Status[size-1] ;
231
232           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
233           MPI_Waitall(size-1,requests,status) ;
234
235           finished=true ;
236           delete [] requests ;
237           delete [] status ;
238         }
239     }
240
241
242     void CServer::listenRootFinalize()
243     {
244        int flag ;
245        MPI_Status status ;
246        int msg ;
247       
248        traceOff() ;
249        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
250        traceOn() ;
251        if (flag==true)
252        {
253           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
254           finished=true ;
255        }
256      }
257
258     void CServer::listenContext(void)
259     {
260
261       MPI_Status status ;
262       int flag = false ;
263       static void* buffer ;
264       static MPI_Request request ;
265       static bool recept=false ;
266       int rank ;
267       int count ; 
268
269       if (recept==false)
270       {     
271         traceOff() ;
272         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
273         traceOn() ;
274         
275         if (flag==true)
276         {
277           #ifdef _usingMPI
278           rank=status.MPI_SOURCE ;
279           #elif _usingEP
280           rank= status.ep_src ;
281           #endif
282           MPI_Get_count(&status,MPI_CHAR,&count) ;
283           buffer=new char[count] ;
284           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
285           recept=true ;
286         }
287         
288       }
289       else
290       {
291         traceOff() ;
292         MPI_Test(&request,&flag,&status) ;
293         traceOn() ;
294         if (flag==true)
295         {
296           #ifdef _usingMPI
297           rank=status.MPI_SOURCE ;
298           #elif _usingEP
299           rank= status.ep_src ;
300           #endif
301           MPI_Get_count(&status,MPI_CHAR,&count) ;
302           recvContextMessage(buffer,count) ;
303           printf("listerContext register context OK, interComm size = %lu\n", interComm.size());
304           
305           delete [] buffer ;
306           recept=false ;
307         }
308       }
309     }
310
311     void CServer::recvContextMessage(void* buff,int count)
312     {
313       static map<string,contextMessage> recvContextId ;
314       map<string,contextMessage>::iterator it ;
315
316       CBufferIn buffer(buff,count) ;
317       string id ;
318       int clientLeader ;
319       int nbMessage ;
320
321       buffer>>id>>nbMessage>>clientLeader ;
322
323       it=recvContextId.find(id) ;
324       if (it==recvContextId.end())
325       {
326         contextMessage msg={0,0} ;
327         pair<map<string,contextMessage>::iterator,bool> ret ;
328         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
329         it=ret.first ;
330       }
331       it->second.nbRecv+=1 ;
332       it->second.leaderRank+=clientLeader ;
333
334       if (it->second.nbRecv==nbMessage)
335       {
336         int size ;
337         MPI_Comm_size(intraComm,&size) ;
338         MPI_Request* requests= new MPI_Request[size-1] ;
339         MPI_Status* status= new MPI_Status[size-1] ;
340
341         for(int i=1;i<size;i++)
342         {
343            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
344         }
345         MPI_Waitall(size-1,requests,status) ;
346         registerContext(buff,count,it->second.leaderRank) ;
347         printf("recvContextMessage register context OK\n");
348
349         recvContextId.erase(it) ;
350         delete [] requests ;
351         delete [] status ;
352
353       }
354     }
355
356     void CServer::listenRootContext(void)
357     {
358
359       MPI_Status status ;
360       int flag ;
361       static void* buffer ;
362       static MPI_Request request ;
363       static bool recept=false ;
364       int rank ;
365       int count ;
366       const int root=0 ;
367       
368
369       if (recept==false)
370       {
371         traceOff() ;
372         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
373         traceOn() ;
374         if (flag==true)
375         {
376           MPI_Get_count(&status,MPI_CHAR,&count) ;
377           buffer=new char[count] ;
378           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
379           recept=true ;
380         }
381       }
382       else
383       {
384         MPI_Test(&request,&flag,&status) ;
385         if (flag==true)
386         {
387           MPI_Get_count(&status,MPI_CHAR,&count) ;
388           registerContext(buffer,count) ;
389           printf("listenRootContext register context OK, interComm size = %lu\n", interComm.size());
390           delete [] buffer ;
391           recept=false ;
392         }
393       }
394     }
395
396     void CServer::registerContext(void* buff, int count, int leaderRank)
397     {
398       string contextId;
399       CBufferIn buffer(buff, count);
400       buffer >> contextId;
401
402       info(20) << "CServer : Register new Context : " << contextId << endl;
403
404       if (contextList.find(contextId) != contextList.end())
405         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
406               << "Context '" << contextId << "' has already been registred");
407
408       MPI_Comm contextIntercomm;
409       
410       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm);
411
412       MPI_Comm inter;
413       MPI_Intercomm_merge(contextIntercomm,1,&inter);
414       MPI_Barrier(inter);
415       
416
417       CContext* context=CContext::create(contextId);
418       contextList[contextId]=context;
419       context->initServer(intraComm,contextIntercomm);
420       
421       
422
423       contextInterComms.push_back(contextIntercomm);
424       
425       
426       MPI_Comm_free(&inter);
427       
428       printf(" ****   server: register context OK\n");
429     }
430
431     void CServer::contextEventLoop(void)
432     {
433       bool finished ;
434       map<string,CContext*>::iterator it ;
435       for(it=contextList.begin();it!=contextList.end();it++)
436       {
437         finished=it->second->checkBuffersAndListen();
438         if (finished)
439         {
440           contextList.erase(it) ;
441           break ;
442         }
443       }
444
445     }
446
447     //! Get rank of the current process
448     int CServer::getRank()
449     {
450       return rank;
451     }
452
453    /*!
454    * Open a file specified by a suffix and an extension and use it for the given file buffer.
455    * The file name will be suffix+rank+extension.
456    *
457    * \param fileName[in] protype file name
458    * \param ext [in] extension of the file
459    * \param fb [in/out] the file buffer
460    */
461    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
462    {
463      StdStringStream fileNameClient;
464      int numDigit = 0;
465      int size = 0;
466      MPI_Comm_size(CXios::globalComm, &size);
467      while (size)
468      {
469        size /= 10;
470        ++numDigit;
471      }
472
473      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
474      fb->open(fileNameClient.str().c_str(), std::ios::out);
475      if (!fb->is_open())
476        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
477              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
478    }
479
480    /*!
481    * \brief Open a file stream to write the info logs
482    * Open a file stream with a specific file name suffix+rank
483    * to write the info logs.
484    * \param fileName [in] protype file name
485    */
486    void CServer::openInfoStream(const StdString& fileName)
487    {
488      std::filebuf* fb = m_infoStream.rdbuf();
489      openStream(fileName, ".out", fb);
490
491      info.write2File(fb);
492      report.write2File(fb);
493    }
494
495    //! Write the info logs to standard output
496    void CServer::openInfoStream()
497    {
498      info.write2StdOut();
499      report.write2StdOut();
500    }
501
502    //! Close the info logs file if it opens
503    void CServer::closeInfoStream()
504    {
505      if (m_infoStream.is_open()) m_infoStream.close();
506    }
507
508    /*!
509    * \brief Open a file stream to write the error log
510    * Open a file stream with a specific file name suffix+rank
511    * to write the error log.
512    * \param fileName [in] protype file name
513    */
514    void CServer::openErrorStream(const StdString& fileName)
515    {
516      std::filebuf* fb = m_errorStream.rdbuf();
517      openStream(fileName, ".err", fb);
518
519      error.write2File(fb);
520    }
521
522    //! Write the error log to standard error output
523    void CServer::openErrorStream()
524    {
525      error.write2StdErr();
526    }
527
528    //! Close the error log file if it opens
529    void CServer::closeErrorStream()
530    {
531      if (m_errorStream.is_open()) m_errorStream.close();
532    }
533}
Note: See TracBrowser for help on using the repository browser.