source: XIOS/dev/branch_openmp/src/server.cpp @ 1328

Last change on this file since 1328 was 1328, checked in by yushan, 7 years ago

dev_omp

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 14.2 KB
RevLine 
[490]1#include "globalScopeData.hpp"
[591]2#include "xios_spl.hpp"
[300]3#include "cxios.hpp"
[342]4#include "server.hpp"
[300]5#include "type.hpp"
6#include "context.hpp"
[352]7#include "object_template.hpp"
[300]8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
[1328]11#include "mpi.hpp"
[347]12#include "tracer.hpp"
13#include "timer.hpp"
[492]14#include "event_scheduler.hpp"
[1328]15using namespace ep_lib;
[300]16
[335]17namespace xios
[490]18{
[300]19    MPI_Comm CServer::intraComm ;
20    list<MPI_Comm> CServer::interComm ;
[655]21    std::list<MPI_Comm> CServer::contextInterComms;
[300]22    bool CServer::isRoot ;
[490]23    int CServer::rank = INVALID_RANK;
24    StdOFStream CServer::m_infoStream;
[523]25    StdOFStream CServer::m_errorStream;
[490]26    map<string,CContext*> CServer::contextList ;
[300]27    bool CServer::finished=false ;
28    bool CServer::is_MPI_Initialized ;
[597]29    CEventScheduler* CServer::eventScheduler = 0;
[697]30   
[300]31    void CServer::initialize(void)
32    {
[1328]33     // int initialized ;
34     // MPI_Initialized(&initialized) ;
35     // if (initialized) is_MPI_Initialized=true ;
36     // else is_MPI_Initialized=false ;
37
[300]38      // Not using OASIS
39      if (!CXios::usingOasis)
40      {
[490]41
[1328]42       // if (!is_MPI_Initialized)
43       // {
44       //   MPI_Init(NULL, NULL);
45       // }
[359]46        CTimer::get("XIOS").resume() ;
[490]47
48        boost::hash<string> hashString ;
49
[300]50        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
51        unsigned long* hashAll ;
[490]52
[1328]53//        int rank ;
[300]54        int size ;
55        int myColor ;
56        int i,c ;
57        MPI_Comm newComm ;
[490]58
[300]59        MPI_Comm_size(CXios::globalComm,&size) ;
60        MPI_Comm_rank(CXios::globalComm,&rank);
61        hashAll=new unsigned long[size] ;
[490]62
[300]63        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
64
65        map<unsigned long, int> colors ;
66        map<unsigned long, int> leaders ;
67        map<unsigned long, int>::iterator it ;
[490]68
[300]69        for(i=0,c=0;i<size;i++)
70        {
71          if (colors.find(hashAll[i])==colors.end())
72          {
73            colors[hashAll[i]]=c ;
74            leaders[hashAll[i]]=i ;
75            c++ ;
76          }
77        }
[490]78
[300]79        myColor=colors[hashServer] ;
[1134]80        MPI_Comm_split(CXios::globalComm,myColor,rank,&intraComm) ;
81
[300]82        int serverLeader=leaders[hashServer] ;
83        int clientLeader;
[490]84
[300]85         serverLeader=leaders[hashServer] ;
[1328]86         for(it=leaders.begin();it!=leaders.end();it++)
[300]87         {
88           if (it->first!=hashServer)
89           {
90             clientLeader=it->second ;
[492]91             int intraCommSize, intraCommRank ;
92             MPI_Comm_size(intraComm,&intraCommSize) ;
93             MPI_Comm_rank(intraComm,&intraCommRank) ;
[1187]94             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
95                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
[490]96
[300]97             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
98             interComm.push_back(newComm) ;
99           }
100         }
101
102         delete [] hashAll ;
103      }
104      // using OASIS
[1328]105/*      else
[300]106      {
[1328]107        int rank ,size;
[490]108        int size;
109        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
110
[359]111        CTimer::get("XIOS").resume() ;
[655]112        MPI_Comm localComm;
113        oasis_get_localcomm(localComm);
114        MPI_Comm_dup(localComm, &intraComm);
115
[300]116        MPI_Comm_rank(intraComm,&rank) ;
117        MPI_Comm_size(intraComm,&size) ;
118        string codesId=CXios::getin<string>("oasis_codes_id") ;
[490]119
[300]120        vector<string> splitted ;
[483]121        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
[300]122        vector<string>::iterator it ;
123
124        MPI_Comm newComm ;
125        int globalRank ;
126        MPI_Comm_rank(CXios::globalComm,&globalRank);
[490]127
[300]128        for(it=splitted.begin();it!=splitted.end();it++)
129        {
130          oasis_get_intercomm(newComm,*it) ;
131          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
132          MPI_Comm_remote_size(newComm,&size);
133          interComm.push_back(newComm) ;
134        }
[492]135              oasis_enddef() ;
[300]136      }
[1328]137*/
138//      int rank;
[300]139      MPI_Comm_rank(intraComm,&rank) ;
140      if (rank==0) isRoot=true;
[490]141      else isRoot=false;
[492]142     
143      eventScheduler = new CEventScheduler(intraComm) ;
[300]144    }
[490]145
[300]146    void CServer::finalize(void)
147    {
[361]148      CTimer::get("XIOS").suspend() ;
[697]149     
[492]150      delete eventScheduler ;
[655]151
[1328]152      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
[655]153        MPI_Comm_free(&(*it));
[1328]154      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)
[655]155        MPI_Comm_free(&(*it));
156      MPI_Comm_free(&intraComm);
157
[300]158      if (!is_MPI_Initialized)
[490]159      {
[300]160        if (CXios::usingOasis) oasis_finalize();
[1328]161        //else MPI_Finalize() ;
[300]162      }
[347]163      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
164      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
165      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
[1205]166      report(100)<<CTimer::getAllCumulatedTime()<<endl ;
[300]167    }
[490]168
[300]169     void CServer::eventLoop(void)
170     {
171       bool stop=false ;
[490]172
[347]173       CTimer::get("XIOS server").resume() ;
[300]174       while(!stop)
175       {
176         if (isRoot)
177         {
178           listenContext();
179           if (!finished) listenFinalize() ;
180         }
181         else
182         {
183           listenRootContext();
[1328]184           if (!finished) listenRootFinalize() ;
[300]185         }
[490]186
[300]187         contextEventLoop() ;
188         if (finished && contextList.empty()) stop=true ;
[956]189         eventScheduler->checkEvent() ;
[300]190       }
[347]191       CTimer::get("XIOS server").suspend() ;
[300]192     }
[490]193
[300]194     void CServer::listenFinalize(void)
195     {
196        list<MPI_Comm>::iterator it;
197        int msg ;
198        int flag ;
[490]199
[1328]200        for(it=interComm.begin();it!=interComm.end();it++)
[300]201        {
202           MPI_Status status ;
[347]203           traceOff() ;
[300]204           MPI_Iprobe(0,0,*it,&flag,&status) ;
[347]205           traceOn() ;
[300]206           if (flag==true)
207           {
208              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
209              info(20)<<" CServer : Receive client finalize"<<endl ;
[655]210              MPI_Comm_free(&(*it));
[300]211              interComm.erase(it) ;
212              break ;
213            }
214         }
[490]215
[300]216         if (interComm.empty())
217         {
218           int i,size ;
219           MPI_Comm_size(intraComm,&size) ;
220           MPI_Request* requests= new MPI_Request[size-1] ;
221           MPI_Status* status= new MPI_Status[size-1] ;
[490]222
[300]223           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
224           MPI_Waitall(size-1,requests,status) ;
225
226           finished=true ;
227           delete [] requests ;
228           delete [] status ;
229         }
230     }
[490]231
232
[300]233     void CServer::listenRootFinalize()
234     {
235        int flag ;
236        MPI_Status status ;
237        int msg ;
[490]238
[347]239        traceOff() ;
[300]240        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
[347]241        traceOn() ;
[300]242        if (flag==true)
243        {
244           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
245           finished=true ;
246        }
247      }
[490]248
[300]249     void CServer::listenContext(void)
250     {
[490]251
[300]252       MPI_Status status ;
253       int flag ;
[1032]254       static char* buffer ;
[300]255       static MPI_Request request ;
256       static bool recept=false ;
257       int rank ;
258       int count ;
[490]259
[300]260       if (recept==false)
261       {
[347]262         traceOff() ;
[1328]263         MPI_Iprobe(-2,1,CXios::globalComm, &flag, &status) ;
[347]264         traceOn() ;
[490]265         if (flag==true)
[300]266         {
[1134]267           #ifdef _usingMPI
[300]268           rank=status.MPI_SOURCE ;
[1134]269           #elif _usingEP
[1328]270           rank=status.ep_src;
[1134]271           #endif
[300]272           MPI_Get_count(&status,MPI_CHAR,&count) ;
273           buffer=new char[count] ;
[1032]274           MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
[490]275           recept=true ;
[300]276         }
277       }
278       else
279       {
[347]280         traceOff() ;
[300]281         MPI_Test(&request,&flag,&status) ;
[347]282         traceOn() ;
[300]283         if (flag==true)
284         {
[1134]285           #ifdef _usingMPI
[300]286           rank=status.MPI_SOURCE ;
[1134]287           #elif _usingEP
[1328]288           rank=status.ep_src;
[1134]289           #endif
[300]290           MPI_Get_count(&status,MPI_CHAR,&count) ;
[1032]291           recvContextMessage((void*)buffer,count) ;
[300]292           delete [] buffer ;
[490]293           recept=false ;
[300]294         }
295       }
296     }
[490]297
[300]298     void CServer::recvContextMessage(void* buff,int count)
299     {
300       static map<string,contextMessage> recvContextId ;
301       map<string,contextMessage>::iterator it ;
[490]302
[300]303       CBufferIn buffer(buff,count) ;
304       string id ;
305       int clientLeader ;
306       int nbMessage ;
307
308       buffer>>id>>nbMessage>>clientLeader ;
[490]309
[300]310       it=recvContextId.find(id) ;
311       if (it==recvContextId.end())
[490]312       {
[300]313         contextMessage msg={0,0} ;
314         pair<map<string,contextMessage>::iterator,bool> ret ;
315         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
316         it=ret.first ;
[490]317       }
[300]318       it->second.nbRecv+=1 ;
319       it->second.leaderRank+=clientLeader ;
[490]320
[300]321       if (it->second.nbRecv==nbMessage)
[490]322       {
[300]323         int size ;
324         MPI_Comm_size(intraComm,&size) ;
325         MPI_Request* requests= new MPI_Request[size-1] ;
326         MPI_Status* status= new MPI_Status[size-1] ;
[490]327
[300]328         for(int i=1;i<size;i++)
329         {
330            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
331         }
332         MPI_Waitall(size-1,requests,status) ;
333         registerContext(buff,count,it->second.leaderRank) ;
334
335         recvContextId.erase(it) ;
336         delete [] requests ;
337         delete [] status ;
338
339       }
[490]340     }
341
[300]342     void CServer::listenRootContext(void)
343     {
[490]344
[300]345       MPI_Status status ;
346       int flag ;
[1032]347       static char* buffer ;
[300]348       static MPI_Request request ;
349       static bool recept=false ;
350       int rank ;
351       int count ;
352       const int root=0 ;
[490]353
[300]354       if (recept==false)
355       {
[347]356         traceOff() ;
[300]357         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
[347]358         traceOn() ;
[490]359         if (flag==true)
[300]360         {
361           MPI_Get_count(&status,MPI_CHAR,&count) ;
362           buffer=new char[count] ;
[1032]363           MPI_Irecv((void*)buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
[490]364           recept=true ;
[300]365         }
366       }
367       else
368       {
369         MPI_Test(&request,&flag,&status) ;
370         if (flag==true)
371         {
372           MPI_Get_count(&status,MPI_CHAR,&count) ;
[1032]373           registerContext((void*)buffer,count) ;
[300]374           delete [] buffer ;
[490]375           recept=false ;
[300]376         }
377       }
[490]378     }
379
[655]380     void CServer::registerContext(void* buff, int count, int leaderRank)
[300]381     {
382       string contextId;
[655]383       CBufferIn buffer(buff, count);
384       buffer >> contextId;
[300]385
[680]386       info(20) << "CServer : Register new Context : " << contextId << endl;
[490]387
[680]388       if (contextList.find(contextId) != contextList.end())
389         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
390               << "Context '" << contextId << "' has already been registred");
[490]391
[655]392       MPI_Comm contextIntercomm;
393       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm);
[490]394
[655]395       MPI_Comm inter;
396       MPI_Intercomm_merge(contextIntercomm,1,&inter);
397       MPI_Barrier(inter);
398
399       CContext* context=CContext::create(contextId);
400       contextList[contextId]=context;
401       context->initServer(intraComm,contextIntercomm);
402
403       contextInterComms.push_back(contextIntercomm);
404       MPI_Comm_free(&inter);
[490]405     }
406
[300]407     void CServer::contextEventLoop(void)
408     {
409       bool finished ;
410       map<string,CContext*>::iterator it ;
[1328]411       for(it=contextList.begin();it!=contextList.end();it++)
[300]412       {
[597]413         finished=it->second->checkBuffersAndListen();
[300]414         if (finished)
415         {
416           contextList.erase(it) ;
417           break ;
418         }
419       }
[490]420
[300]421     }
[490]422
423     //! Get rank of the current process
424     int CServer::getRank()
425     {
426       return rank;
427     }
428
[523]429    /*!
430    * Open a file specified by a suffix and an extension and use it for the given file buffer.
431    * The file name will be suffix+rank+extension.
432    *
433    * \param fileName[in] protype file name
434    * \param ext [in] extension of the file
435    * \param fb [in/out] the file buffer
436    */
437    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
438    {
439      StdStringStream fileNameClient;
440      int numDigit = 0;
441      int size = 0;
442      MPI_Comm_size(CXios::globalComm, &size);
443      while (size)
444      {
445        size /= 10;
446        ++numDigit;
447      }
[497]448
[523]449      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
450      fb->open(fileNameClient.str().c_str(), std::ios::out);
451      if (!fb->is_open())
452        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
453              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
454    }
[490]455
[523]456    /*!
457    * \brief Open a file stream to write the info logs
458    * Open a file stream with a specific file name suffix+rank
459    * to write the info logs.
460    * \param fileName [in] protype file name
461    */
462    void CServer::openInfoStream(const StdString& fileName)
463    {
464      std::filebuf* fb = m_infoStream.rdbuf();
465      openStream(fileName, ".out", fb);
[490]466
[523]467      info.write2File(fb);
468      report.write2File(fb);
469    }
[490]470
[523]471    //! Write the info logs to standard output
472    void CServer::openInfoStream()
473    {
474      info.write2StdOut();
475      report.write2StdOut();
476    }
[490]477
[523]478    //! Close the info logs file if it opens
479    void CServer::closeInfoStream()
480    {
481      if (m_infoStream.is_open()) m_infoStream.close();
482    }
483
484    /*!
485    * \brief Open a file stream to write the error log
486    * Open a file stream with a specific file name suffix+rank
487    * to write the error log.
488    * \param fileName [in] protype file name
489    */
490    void CServer::openErrorStream(const StdString& fileName)
491    {
492      std::filebuf* fb = m_errorStream.rdbuf();
493      openStream(fileName, ".err", fb);
494
495      error.write2File(fb);
496    }
497
498    //! Write the error log to standard error output
499    void CServer::openErrorStream()
500    {
501      error.write2StdErr();
502    }
503
504    //! Close the error log file if it opens
505    void CServer::closeErrorStream()
506    {
507      if (m_errorStream.is_open()) m_errorStream.close();
508    }
[300]509}
Note: See TracBrowser for help on using the repository browser.