source: XIOS/dev/branch_yushan_merged/src/server.cpp @ 1134

Last change on this file since 1134 was 1134, checked in by yushan, 7 years ago

branch merged with trunk r1130

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 14.0 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "type.hpp"
6#include "context.hpp"
7#include "object_template.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11#include "tracer.hpp"
12#include "timer.hpp"
13#include "event_scheduler.hpp"
14
15namespace xios
16{
17    MPI_Comm CServer::intraComm ;
18    list<MPI_Comm> CServer::interComm ;
19    std::list<MPI_Comm> CServer::contextInterComms;
20    bool CServer::isRoot ;
21    int CServer::rank = INVALID_RANK;
22    StdOFStream CServer::m_infoStream;
23    StdOFStream CServer::m_errorStream;
24    map<string,CContext*> CServer::contextList ;
25    bool CServer::finished=false ;
26    bool CServer::is_MPI_Initialized ;
27
28   
29    CEventScheduler* CServer::eventScheduler = 0;
30   
31    void CServer::initialize(void)
32    {
33      // Not using OASIS
34      if (!CXios::usingOasis)
35      {
36
37        CTimer::get("XIOS").resume() ;
38
39        boost::hash<string> hashString ;
40
41        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
42        unsigned long* hashAll ;
43
44
45        int size ;
46        int myColor ;
47        int i,c ;
48        MPI_Comm newComm ;
49
50        MPI_Comm_size(CXios::globalComm,&size) ;
51        MPI_Comm_rank(CXios::globalComm,&rank);
52        hashAll=new unsigned long[size] ;
53
54        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
55
56        map<unsigned long, int> colors ;
57        map<unsigned long, int> leaders ;
58        map<unsigned long, int>::iterator it ;
59
60        for(i=0,c=0;i<size;i++)
61        {
62          if (colors.find(hashAll[i])==colors.end())
63          {
64            colors[hashAll[i]]=c ;
65            leaders[hashAll[i]]=i ;
66            c++ ;
67          }
68        }
69
70        myColor=colors[hashServer] ;
71
72
73        MPI_Comm_split(CXios::globalComm,myColor,rank,&intraComm) ;
74
75       
76        int serverLeader=leaders[hashServer] ;
77        int clientLeader;
78
79         serverLeader=leaders[hashServer] ;
80         for(it=leaders.begin();it!=leaders.end();++it)
81         {
82           if (it->first!=hashServer)
83           {
84             clientLeader=it->second ;
85             int intraCommSize, intraCommRank ;
86             MPI_Comm_size(intraComm,&intraCommSize) ;
87             MPI_Comm_rank(intraComm,&intraCommRank) ;
88             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
89                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
90
91             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
92             interComm.push_back(newComm) ;
93           }
94         }
95
96         delete [] hashAll ;
97      }
98      // using OASIS
99      else
100      {
101        int size;
102        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
103
104        CTimer::get("XIOS").resume() ;
105        MPI_Comm localComm;
106        oasis_get_localcomm(localComm);
107        MPI_Comm_dup(localComm, &intraComm);
108
109        MPI_Comm_rank(intraComm,&rank) ;
110        MPI_Comm_size(intraComm,&size) ;
111        string codesId=CXios::getin<string>("oasis_codes_id") ;
112
113        vector<string> splitted ;
114        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
115        vector<string>::iterator it ;
116
117        MPI_Comm newComm ;
118        int globalRank ;
119        MPI_Comm_rank(CXios::globalComm,&globalRank);
120
121        for(it=splitted.begin();it!=splitted.end();it++)
122        {
123          oasis_get_intercomm(newComm,*it) ;
124          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
125          MPI_Comm_remote_size(newComm,&size);
126          interComm.push_back(newComm) ;
127        }
128              oasis_enddef() ;
129      }
130
131      MPI_Comm_rank(intraComm,&rank) ;
132      if (rank==0) isRoot=true;
133      else isRoot=false;
134     
135      eventScheduler = new CEventScheduler(intraComm) ;
136    }
137
138    void CServer::finalize(void)
139    {
140      CTimer::get("XIOS").suspend() ;
141     
142      delete eventScheduler ;
143
144      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); ++it)
145        MPI_Comm_free(&(*it));
146      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); ++it)
147        MPI_Comm_free(&(*it));
148
149      MPI_Comm_free(&intraComm);
150
151      if (!is_MPI_Initialized)
152      {
153        if (CXios::usingOasis) oasis_finalize();
154        //else  {MPI_Finalize() ;}
155      }
156
157     
158      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
159      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
160      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
161    }
162
163     void CServer::eventLoop(void)
164     {
165       bool stop=false ;
166
167       CTimer::get("XIOS server").resume() ;
168       while(!stop)
169       {
170         if (isRoot)
171         {
172           listenContext();
173           if (!finished) listenFinalize() ;
174         }
175         else
176         {
177           listenRootContext();
178           if (!finished) 
179           {
180             listenRootFinalize() ;
181           }
182         }
183
184         contextEventLoop() ;
185         if (finished && contextList.empty()) stop=true ;
186         
187         eventScheduler->checkEvent() ;
188       }
189       
190       
191       CTimer::get("XIOS server").suspend() ;
192     }
193
194     void CServer::listenFinalize(void)
195     {
196        list<MPI_Comm>::iterator it;
197        int msg ;
198        int flag ;
199
200        for(it=interComm.begin();it!=interComm.end();++it)
201        {
202           MPI_Status status ;
203           traceOff() ;
204           MPI_Iprobe(0,0,*it,&flag,&status) ;
205           traceOn() ;
206           if (flag==true)
207           {
208              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
209              info(20)<<" CServer : Receive client finalize"<<endl ;
210
211              MPI_Comm_free(&(*it));
212              interComm.erase(it) ;
213              break ;
214            }
215         }
216
217         if (interComm.empty())
218         {
219           int i,size ;
220           MPI_Comm_size(intraComm,&size) ;
221           MPI_Request* requests= new MPI_Request[size-1] ;
222           MPI_Status* status= new MPI_Status[size-1] ;
223
224           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
225           MPI_Waitall(size-1,requests,status) ;
226
227           finished=true ;
228           delete [] requests ;
229           delete [] status ;
230         }
231     }
232
233
234     void CServer::listenRootFinalize()
235     {
236        int flag ;
237        MPI_Status status ;
238        int msg ;
239
240        traceOff() ;
241        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
242        traceOn() ;
243        if (flag==true)
244        {
245           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
246           finished=true ;
247        }
248      }
249
250     void CServer::listenContext(void)
251     {
252
253       MPI_Status status ;
254       int flag ;
255       static char* buffer ;
256       static MPI_Request request ;
257       static bool recept=false ;
258       int rank ;
259       int count ;
260
261       if (recept==false)
262       {
263         traceOff() ;
264         #ifdef _usingEP
265         MPI_Iprobe(-1,1,CXios::globalComm, &flag, &status) ;
266         #else
267         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
268         #endif
269         traceOn() ;
270         
271         if (flag==true)
272         {
273           #ifdef _usingMPI
274           rank=status.MPI_SOURCE ;
275           #elif _usingEP
276           rank= status.ep_src ;
277           #endif
278           MPI_Get_count(&status,MPI_CHAR,&count) ;
279           buffer=new char[count] ;
280           MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
281           recept=true ;
282         }
283       }
284       else
285       {
286         traceOff() ;
287         MPI_Test(&request,&flag,&status) ;
288         traceOn() ;
289         if (flag==true)
290         {
291           #ifdef _usingMPI
292           rank=status.MPI_SOURCE ;
293           #elif _usingEP
294           rank= status.ep_src ;
295           #endif
296           MPI_Get_count(&status,MPI_CHAR,&count) ;
297           recvContextMessage((void*)buffer,count) ;
298           delete [] buffer ;
299           recept=false ;
300         }
301       }
302     }
303
304     void CServer::recvContextMessage(void* buff,int count)
305     {
306       static map<string,contextMessage> recvContextId ;
307       map<string,contextMessage>::iterator it ;
308
309       CBufferIn buffer(buff,count) ;
310       string id ;
311       int clientLeader ;
312       int nbMessage ;
313
314       buffer>>id>>nbMessage>>clientLeader ;
315
316       it=recvContextId.find(id) ;
317       if (it==recvContextId.end())
318       {
319         contextMessage msg={0,0} ;
320         pair<map<string,contextMessage>::iterator,bool> ret ;
321         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
322         it=ret.first ;
323       }
324       it->second.nbRecv+=1 ;
325       it->second.leaderRank+=clientLeader ;
326
327       if (it->second.nbRecv==nbMessage)
328       {
329         int size ;
330         MPI_Comm_size(intraComm,&size) ;
331         MPI_Request* requests= new MPI_Request[size-1] ;
332         MPI_Status* status= new MPI_Status[size-1] ;
333
334         for(int i=1;i<size;i++)
335         {
336            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
337         }
338         MPI_Waitall(size-1,requests,status) ;
339         registerContext(buff,count,it->second.leaderRank) ;
340
341         recvContextId.erase(it) ;
342         delete [] requests ;
343         delete [] status ;
344
345       }
346     }
347
348     void CServer::listenRootContext(void)
349     {
350
351       MPI_Status status ;
352       int flag ;
353       static char* buffer ;
354       static MPI_Request request ;
355       static bool recept=false ;
356       int rank ;
357       int count ;
358       const int root=0 ;
359
360       if (recept==false)
361       {
362         traceOff() ;
363         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
364         traceOn() ;
365         if (flag==true)
366         {
367           MPI_Get_count(&status,MPI_CHAR,&count) ;
368           buffer=new char[count] ;
369           MPI_Irecv((void*)buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
370           recept=true ;
371         }
372       }
373       else
374       {
375         MPI_Test(&request,&flag,&status) ;
376         if (flag==true)
377         {
378           MPI_Get_count(&status,MPI_CHAR,&count) ;
379           registerContext((void*)buffer,count) ;
380           delete [] buffer ;
381           recept=false ;
382         }
383       }
384     }
385
386     void CServer::registerContext(void* buff, int count, int leaderRank)
387     {
388       string contextId;
389       CBufferIn buffer(buff, count);
390       buffer >> contextId;
391
392       info(20) << "CServer : Register new Context : " << contextId << endl;
393
394       if (contextList.find(contextId) != contextList.end())
395         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
396               << "Context '" << contextId << "' has already been registred");
397
398       MPI_Comm contextIntercomm;
399       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm);
400
401       MPI_Comm inter;
402       MPI_Intercomm_merge(contextIntercomm,1,&inter);
403       MPI_Barrier(inter);
404
405       CContext* context=CContext::create(contextId);
406       contextList[contextId]=context;
407       context->initServer(intraComm,contextIntercomm);
408
409       contextInterComms.push_back(contextIntercomm);
410       MPI_Comm_free(&inter);
411     }
412
413     void CServer::contextEventLoop(void)
414     {
415       bool finished ;
416       map<string,CContext*>::iterator it ;
417       for(it=contextList.begin();it!=contextList.end();++it)
418       {
419         finished=it->second->checkBuffersAndListen();
420         if (finished)
421         {
422           contextList.erase(it) ;
423           break ;
424         }
425       }
426
427     }
428
429     //! Get rank of the current process
430     int CServer::getRank()
431     {
432       return rank;
433     }
434
435    /*!
436    * Open a file specified by a suffix and an extension and use it for the given file buffer.
437    * The file name will be suffix+rank+extension.
438    *
439    * \param fileName[in] protype file name
440    * \param ext [in] extension of the file
441    * \param fb [in/out] the file buffer
442    */
443    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
444    {
445      StdStringStream fileNameClient;
446      int numDigit = 0;
447      int size = 0;
448      MPI_Comm_size(CXios::globalComm, &size);
449      while (size)
450      {
451        size /= 10;
452        ++numDigit;
453      }
454
455      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
456      fb->open(fileNameClient.str().c_str(), std::ios::out);
457      if (!fb->is_open())
458        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
459              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
460    }
461
462    /*!
463    * \brief Open a file stream to write the info logs
464    * Open a file stream with a specific file name suffix+rank
465    * to write the info logs.
466    * \param fileName [in] protype file name
467    */
468    void CServer::openInfoStream(const StdString& fileName)
469    {
470      std::filebuf* fb = m_infoStream.rdbuf();
471      openStream(fileName, ".out", fb);
472
473      info.write2File(fb);
474      report.write2File(fb);
475    }
476
477    //! Write the info logs to standard output
478    void CServer::openInfoStream()
479    {
480      info.write2StdOut();
481      report.write2StdOut();
482    }
483
484    //! Close the info logs file if it opens
485    void CServer::closeInfoStream()
486    {
487      if (m_infoStream.is_open()) m_infoStream.close();
488    }
489
490    /*!
491    * \brief Open a file stream to write the error log
492    * Open a file stream with a specific file name suffix+rank
493    * to write the error log.
494    * \param fileName [in] protype file name
495    */
496    void CServer::openErrorStream(const StdString& fileName)
497    {
498      std::filebuf* fb = m_errorStream.rdbuf();
499      openStream(fileName, ".err", fb);
500
501      error.write2File(fb);
502    }
503
504    //! Write the error log to standard error output
505    void CServer::openErrorStream()
506    {
507      error.write2StdErr();
508    }
509
510    //! Close the error log file if it opens
511    void CServer::closeErrorStream()
512    {
513      if (m_errorStream.is_open()) m_errorStream.close();
514    }
515}
Note: See TracBrowser for help on using the repository browser.