source: XIOS/dev/branch_openmp/src/server.cpp @ 1328

Last change on this file since 1328 was 1328, checked in by yushan, 6 years ago

dev_omp

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 14.2 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "type.hpp"
6#include "context.hpp"
7#include "object_template.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11#include "mpi.hpp"
12#include "tracer.hpp"
13#include "timer.hpp"
14#include "event_scheduler.hpp"
15using namespace ep_lib;
16
17namespace xios
18{
19    MPI_Comm CServer::intraComm ;
20    list<MPI_Comm> CServer::interComm ;
21    std::list<MPI_Comm> CServer::contextInterComms;
22    bool CServer::isRoot ;
23    int CServer::rank = INVALID_RANK;
24    StdOFStream CServer::m_infoStream;
25    StdOFStream CServer::m_errorStream;
26    map<string,CContext*> CServer::contextList ;
27    bool CServer::finished=false ;
28    bool CServer::is_MPI_Initialized ;
29    CEventScheduler* CServer::eventScheduler = 0;
30   
31    void CServer::initialize(void)
32    {
33     // int initialized ;
34     // MPI_Initialized(&initialized) ;
35     // if (initialized) is_MPI_Initialized=true ;
36     // else is_MPI_Initialized=false ;
37
38      // Not using OASIS
39      if (!CXios::usingOasis)
40      {
41
42       // if (!is_MPI_Initialized)
43       // {
44       //   MPI_Init(NULL, NULL);
45       // }
46        CTimer::get("XIOS").resume() ;
47
48        boost::hash<string> hashString ;
49
50        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
51        unsigned long* hashAll ;
52
53//        int rank ;
54        int size ;
55        int myColor ;
56        int i,c ;
57        MPI_Comm newComm ;
58
59        MPI_Comm_size(CXios::globalComm,&size) ;
60        MPI_Comm_rank(CXios::globalComm,&rank);
61        hashAll=new unsigned long[size] ;
62
63        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
64
65        map<unsigned long, int> colors ;
66        map<unsigned long, int> leaders ;
67        map<unsigned long, int>::iterator it ;
68
69        for(i=0,c=0;i<size;i++)
70        {
71          if (colors.find(hashAll[i])==colors.end())
72          {
73            colors[hashAll[i]]=c ;
74            leaders[hashAll[i]]=i ;
75            c++ ;
76          }
77        }
78
79        myColor=colors[hashServer] ;
80        MPI_Comm_split(CXios::globalComm,myColor,rank,&intraComm) ;
81
82        int serverLeader=leaders[hashServer] ;
83        int clientLeader;
84
85         serverLeader=leaders[hashServer] ;
86         for(it=leaders.begin();it!=leaders.end();it++)
87         {
88           if (it->first!=hashServer)
89           {
90             clientLeader=it->second ;
91             int intraCommSize, intraCommRank ;
92             MPI_Comm_size(intraComm,&intraCommSize) ;
93             MPI_Comm_rank(intraComm,&intraCommRank) ;
94             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
95                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
96
97             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
98             interComm.push_back(newComm) ;
99           }
100         }
101
102         delete [] hashAll ;
103      }
104      // using OASIS
105/*      else
106      {
107        int rank ,size;
108        int size;
109        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
110
111        CTimer::get("XIOS").resume() ;
112        MPI_Comm localComm;
113        oasis_get_localcomm(localComm);
114        MPI_Comm_dup(localComm, &intraComm);
115
116        MPI_Comm_rank(intraComm,&rank) ;
117        MPI_Comm_size(intraComm,&size) ;
118        string codesId=CXios::getin<string>("oasis_codes_id") ;
119
120        vector<string> splitted ;
121        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
122        vector<string>::iterator it ;
123
124        MPI_Comm newComm ;
125        int globalRank ;
126        MPI_Comm_rank(CXios::globalComm,&globalRank);
127
128        for(it=splitted.begin();it!=splitted.end();it++)
129        {
130          oasis_get_intercomm(newComm,*it) ;
131          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
132          MPI_Comm_remote_size(newComm,&size);
133          interComm.push_back(newComm) ;
134        }
135              oasis_enddef() ;
136      }
137*/
138//      int rank;
139      MPI_Comm_rank(intraComm,&rank) ;
140      if (rank==0) isRoot=true;
141      else isRoot=false;
142     
143      eventScheduler = new CEventScheduler(intraComm) ;
144    }
145
146    void CServer::finalize(void)
147    {
148      CTimer::get("XIOS").suspend() ;
149     
150      delete eventScheduler ;
151
152      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
153        MPI_Comm_free(&(*it));
154      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)
155        MPI_Comm_free(&(*it));
156      MPI_Comm_free(&intraComm);
157
158      if (!is_MPI_Initialized)
159      {
160        if (CXios::usingOasis) oasis_finalize();
161        //else MPI_Finalize() ;
162      }
163      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
164      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
165      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
166      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
167    }
168
169     void CServer::eventLoop(void)
170     {
171       bool stop=false ;
172
173       CTimer::get("XIOS server").resume() ;
174       while(!stop)
175       {
176         if (isRoot)
177         {
178           listenContext();
179           if (!finished) listenFinalize() ;
180         }
181         else
182         {
183           listenRootContext();
184           if (!finished) listenRootFinalize() ;
185         }
186
187         contextEventLoop() ;
188         if (finished && contextList.empty()) stop=true ;
189         eventScheduler->checkEvent() ;
190       }
191       CTimer::get("XIOS server").suspend() ;
192     }
193
194     void CServer::listenFinalize(void)
195     {
196        list<MPI_Comm>::iterator it;
197        int msg ;
198        int flag ;
199
200        for(it=interComm.begin();it!=interComm.end();it++)
201        {
202           MPI_Status status ;
203           traceOff() ;
204           MPI_Iprobe(0,0,*it,&flag,&status) ;
205           traceOn() ;
206           if (flag==true)
207           {
208              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
209              info(20)<<" CServer : Receive client finalize"<<endl ;
210              MPI_Comm_free(&(*it));
211              interComm.erase(it) ;
212              break ;
213            }
214         }
215
216         if (interComm.empty())
217         {
218           int i,size ;
219           MPI_Comm_size(intraComm,&size) ;
220           MPI_Request* requests= new MPI_Request[size-1] ;
221           MPI_Status* status= new MPI_Status[size-1] ;
222
223           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
224           MPI_Waitall(size-1,requests,status) ;
225
226           finished=true ;
227           delete [] requests ;
228           delete [] status ;
229         }
230     }
231
232
233     void CServer::listenRootFinalize()
234     {
235        int flag ;
236        MPI_Status status ;
237        int msg ;
238
239        traceOff() ;
240        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
241        traceOn() ;
242        if (flag==true)
243        {
244           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
245           finished=true ;
246        }
247      }
248
249     void CServer::listenContext(void)
250     {
251
252       MPI_Status status ;
253       int flag ;
254       static char* buffer ;
255       static MPI_Request request ;
256       static bool recept=false ;
257       int rank ;
258       int count ;
259
260       if (recept==false)
261       {
262         traceOff() ;
263         MPI_Iprobe(-2,1,CXios::globalComm, &flag, &status) ;
264         traceOn() ;
265         if (flag==true)
266         {
267           #ifdef _usingMPI
268           rank=status.MPI_SOURCE ;
269           #elif _usingEP
270           rank=status.ep_src;
271           #endif
272           MPI_Get_count(&status,MPI_CHAR,&count) ;
273           buffer=new char[count] ;
274           MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
275           recept=true ;
276         }
277       }
278       else
279       {
280         traceOff() ;
281         MPI_Test(&request,&flag,&status) ;
282         traceOn() ;
283         if (flag==true)
284         {
285           #ifdef _usingMPI
286           rank=status.MPI_SOURCE ;
287           #elif _usingEP
288           rank=status.ep_src;
289           #endif
290           MPI_Get_count(&status,MPI_CHAR,&count) ;
291           recvContextMessage((void*)buffer,count) ;
292           delete [] buffer ;
293           recept=false ;
294         }
295       }
296     }
297
298     void CServer::recvContextMessage(void* buff,int count)
299     {
300       static map<string,contextMessage> recvContextId ;
301       map<string,contextMessage>::iterator it ;
302
303       CBufferIn buffer(buff,count) ;
304       string id ;
305       int clientLeader ;
306       int nbMessage ;
307
308       buffer>>id>>nbMessage>>clientLeader ;
309
310       it=recvContextId.find(id) ;
311       if (it==recvContextId.end())
312       {
313         contextMessage msg={0,0} ;
314         pair<map<string,contextMessage>::iterator,bool> ret ;
315         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
316         it=ret.first ;
317       }
318       it->second.nbRecv+=1 ;
319       it->second.leaderRank+=clientLeader ;
320
321       if (it->second.nbRecv==nbMessage)
322       {
323         int size ;
324         MPI_Comm_size(intraComm,&size) ;
325         MPI_Request* requests= new MPI_Request[size-1] ;
326         MPI_Status* status= new MPI_Status[size-1] ;
327
328         for(int i=1;i<size;i++)
329         {
330            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
331         }
332         MPI_Waitall(size-1,requests,status) ;
333         registerContext(buff,count,it->second.leaderRank) ;
334
335         recvContextId.erase(it) ;
336         delete [] requests ;
337         delete [] status ;
338
339       }
340     }
341
342     void CServer::listenRootContext(void)
343     {
344
345       MPI_Status status ;
346       int flag ;
347       static char* buffer ;
348       static MPI_Request request ;
349       static bool recept=false ;
350       int rank ;
351       int count ;
352       const int root=0 ;
353
354       if (recept==false)
355       {
356         traceOff() ;
357         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
358         traceOn() ;
359         if (flag==true)
360         {
361           MPI_Get_count(&status,MPI_CHAR,&count) ;
362           buffer=new char[count] ;
363           MPI_Irecv((void*)buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
364           recept=true ;
365         }
366       }
367       else
368       {
369         MPI_Test(&request,&flag,&status) ;
370         if (flag==true)
371         {
372           MPI_Get_count(&status,MPI_CHAR,&count) ;
373           registerContext((void*)buffer,count) ;
374           delete [] buffer ;
375           recept=false ;
376         }
377       }
378     }
379
380     void CServer::registerContext(void* buff, int count, int leaderRank)
381     {
382       string contextId;
383       CBufferIn buffer(buff, count);
384       buffer >> contextId;
385
386       info(20) << "CServer : Register new Context : " << contextId << endl;
387
388       if (contextList.find(contextId) != contextList.end())
389         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
390               << "Context '" << contextId << "' has already been registred");
391
392       MPI_Comm contextIntercomm;
393       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm);
394
395       MPI_Comm inter;
396       MPI_Intercomm_merge(contextIntercomm,1,&inter);
397       MPI_Barrier(inter);
398
399       CContext* context=CContext::create(contextId);
400       contextList[contextId]=context;
401       context->initServer(intraComm,contextIntercomm);
402
403       contextInterComms.push_back(contextIntercomm);
404       MPI_Comm_free(&inter);
405     }
406
407     void CServer::contextEventLoop(void)
408     {
409       bool finished ;
410       map<string,CContext*>::iterator it ;
411       for(it=contextList.begin();it!=contextList.end();it++)
412       {
413         finished=it->second->checkBuffersAndListen();
414         if (finished)
415         {
416           contextList.erase(it) ;
417           break ;
418         }
419       }
420
421     }
422
423     //! Get rank of the current process
424     int CServer::getRank()
425     {
426       return rank;
427     }
428
429    /*!
430    * Open a file specified by a suffix and an extension and use it for the given file buffer.
431    * The file name will be suffix+rank+extension.
432    *
433    * \param fileName[in] protype file name
434    * \param ext [in] extension of the file
435    * \param fb [in/out] the file buffer
436    */
437    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
438    {
439      StdStringStream fileNameClient;
440      int numDigit = 0;
441      int size = 0;
442      MPI_Comm_size(CXios::globalComm, &size);
443      while (size)
444      {
445        size /= 10;
446        ++numDigit;
447      }
448
449      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
450      fb->open(fileNameClient.str().c_str(), std::ios::out);
451      if (!fb->is_open())
452        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
453              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
454    }
455
456    /*!
457    * \brief Open a file stream to write the info logs
458    * Open a file stream with a specific file name suffix+rank
459    * to write the info logs.
460    * \param fileName [in] protype file name
461    */
462    void CServer::openInfoStream(const StdString& fileName)
463    {
464      std::filebuf* fb = m_infoStream.rdbuf();
465      openStream(fileName, ".out", fb);
466
467      info.write2File(fb);
468      report.write2File(fb);
469    }
470
471    //! Write the info logs to standard output
472    void CServer::openInfoStream()
473    {
474      info.write2StdOut();
475      report.write2StdOut();
476    }
477
478    //! Close the info logs file if it opens
479    void CServer::closeInfoStream()
480    {
481      if (m_infoStream.is_open()) m_infoStream.close();
482    }
483
484    /*!
485    * \brief Open a file stream to write the error log
486    * Open a file stream with a specific file name suffix+rank
487    * to write the error log.
488    * \param fileName [in] protype file name
489    */
490    void CServer::openErrorStream(const StdString& fileName)
491    {
492      std::filebuf* fb = m_errorStream.rdbuf();
493      openStream(fileName, ".err", fb);
494
495      error.write2File(fb);
496    }
497
498    //! Write the error log to standard error output
499    void CServer::openErrorStream()
500    {
501      error.write2StdErr();
502    }
503
504    //! Close the error log file if it opens
505    void CServer::closeErrorStream()
506    {
507      if (m_errorStream.is_open()) m_errorStream.close();
508    }
509}
Note: See TracBrowser for help on using the repository browser.