source: XIOS/dev/dev_olga/src/server.cpp @ 1025

Last change on this file since 1025 was 1021, checked in by oabramkina, 7 years ago

Intermeadiate version for merging with new server functionalities.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 18.2 KB
RevLine 
[490]1#include "globalScopeData.hpp"
[591]2#include "xios_spl.hpp"
[300]3#include "cxios.hpp"
[342]4#include "server.hpp"
[983]5#include "client.hpp"
[300]6#include "type.hpp"
7#include "context.hpp"
[352]8#include "object_template.hpp"
[300]9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
[382]12#include "mpi.hpp"
[347]13#include "tracer.hpp"
14#include "timer.hpp"
[492]15#include "event_scheduler.hpp"
[300]16
[335]17namespace xios
[490]18{
[300]19    MPI_Comm CServer::intraComm ;
[992]20    list<MPI_Comm> CServer::interCommLeft ;
21    list<MPI_Comm> CServer::interCommRight ;
[300]22    list<MPI_Comm> CServer::interComm ;
[655]23    std::list<MPI_Comm> CServer::contextInterComms;
[1021]24    int CServer::serverLevel = 0 ;
25    int CServer::nbPools = 0;
26    int CServer::poolId = 0;
27    int CServer::serverSize = 0;
[983]28    bool CServer::isRoot = false ;
[490]29    int CServer::rank = INVALID_RANK;
30    StdOFStream CServer::m_infoStream;
[523]31    StdOFStream CServer::m_errorStream;
[490]32    map<string,CContext*> CServer::contextList ;
[300]33    bool CServer::finished=false ;
34    bool CServer::is_MPI_Initialized ;
[597]35    CEventScheduler* CServer::eventScheduler = 0;
[983]36
37//---------------------------------------------------------------
38/*!
39 * \fn void CServer::initialize(void)
[1021]40 * Creates intraComm and interComm for a server pool (primary or secondary).
[983]41 */
[300]42    void CServer::initialize(void)
43    {
44      int initialized ;
45      MPI_Initialized(&initialized) ;
46      if (initialized) is_MPI_Initialized=true ;
47      else is_MPI_Initialized=false ;
[490]48
[300]49      // Not using OASIS
50      if (!CXios::usingOasis)
51      {
[490]52
53        if (!is_MPI_Initialized)
[300]54        {
[925]55          MPI_Init(NULL, NULL);
[300]56        }
[359]57        CTimer::get("XIOS").resume() ;
[490]58
59        boost::hash<string> hashString ;
[1021]60//        unsigned long hashServer1 = hashString(CXios::xiosCodeIdPrm);
61//        unsigned long hashServer2 = hashString(CXios::xiosCodeIdSnd);
62//        unsigned long hashServer = (CXios::serverLevel < 2)  ? hashServer1 : hashServer2;
63        unsigned long hashServer = hashString(CXios::xiosCodeId);
[490]64
[300]65        unsigned long* hashAll ;
[1021]66//        unsigned long* hashAllServers ;
[490]67
68//        int rank ;
[300]69        int size ;
70        int myColor ;
71        int i,c ;
[1021]72        MPI_Comm newComm, serversComm;
[490]73
[983]74        MPI_Comm_size(CXios::globalComm, &size) ;
75        MPI_Comm_rank(CXios::globalComm, &rank);
[1009]76
[300]77        hashAll=new unsigned long[size] ;
[983]78        MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ;
[490]79
[1021]80        map<unsigned long, int> colors ;
[300]81        map<unsigned long, int> leaders ;
82        map<unsigned long, int>::iterator it ;
[490]83
[300]84        for(i=0,c=0;i<size;i++)
85        {
86          if (colors.find(hashAll[i])==colors.end())
87          {
88            colors[hashAll[i]]=c ;
89            leaders[hashAll[i]]=i ;
90            c++ ;
91          }
92        }
[490]93
[1021]94        // Setting the number of secondary pools
95        myColor = colors[hashServer];
96        if (CXios::usingServer2)
[1009]97        {
[1021]98          int serverRank = rank - leaders[hashServer]; // server proc rank starting 0
99          serverSize = size - leaders[hashServer];
100          nbPools = serverSize * CXios::ratioServer2 / 100;
101          if ( serverRank < (serverSize - nbPools) )
[1009]102          {
[1021]103            serverLevel = 1;
[1009]104          }
[1021]105          else
[1009]106          {
[1021]107            serverLevel = 2;
108            poolId = serverRank - serverSize + nbPools;
109            myColor = rank;
[1009]110          }
111        }
112
[1021]113        MPI_Comm_split(CXios::globalComm, myColor, rank, &intraComm) ;
[1009]114
[1021]115        if (serverLevel == 0)
[983]116        {
117          int clientLeader;
118          for(it=leaders.begin();it!=leaders.end();it++)
119          {
120            if (it->first!=hashServer)
121            {
122              clientLeader=it->second ;
123              int intraCommSize, intraCommRank ;
124              MPI_Comm_size(intraComm,&intraCommSize) ;
125              MPI_Comm_rank(intraComm,&intraCommRank) ;
126              info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
127                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
[490]128
[1009]129              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
[992]130               interCommLeft.push_back(newComm) ;
[983]131               interComm.push_back(newComm) ;
132            }
133          }
134        }
[1021]135        else if (serverLevel == 1)
[983]136        {
[1021]137          int clientLeader, srvPrmLeader, srvSndLeader;
138          for (it=leaders.begin();it!=leaders.end();it++)
[983]139          {
[1021]140            if (it->first != hashServer)
[983]141            {
[1021]142              clientLeader=it->second ;
143              int intraCommSize, intraCommRank ;
144              MPI_Comm_size(intraComm, &intraCommSize) ;
145              MPI_Comm_rank(intraComm, &intraCommRank) ;
146              info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
147                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
148              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
149              interCommLeft.push_back(newComm) ;
150              interComm.push_back(newComm) ;
[983]151            }
152            else
[1021]153              srvPrmLeader = it->second;
154          }
[1009]155
[1021]156          for (int i = 0; i < nbPools; ++i)
[983]157          {
[1021]158            srvSndLeader = srvPrmLeader + serverSize - nbPools + i;
159//            CClient::initializeClientOnServer(rank, serversComm, srvSndLeader);
160            CClient::initializeClientOnServer(rank, intraComm, srvSndLeader);
161            interCommRight.push_back(CClient::getInterComm());
162            interComm.push_back(CClient::getInterComm());
163          }
164        } // primary server
165        else
166        {
167          int clientLeader;
168          clientLeader = leaders[hashString(CXios::xiosCodeId)];
169          int intraCommSize, intraCommRank ;
170          MPI_Comm_size(intraComm, &intraCommSize) ;
171          MPI_Comm_rank(intraComm, &intraCommRank) ;
172          info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
173                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
[983]174
[1021]175          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
176          interCommLeft.push_back(newComm) ;
177          interComm.push_back(newComm) ;
178        } // secondary server
[983]179
180        delete [] hashAll ;
181
[300]182      }
183      // using OASIS
184      else
185      {
[490]186//        int rank ,size;
187        int size;
188        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
189
[359]190        CTimer::get("XIOS").resume() ;
[655]191        MPI_Comm localComm;
192        oasis_get_localcomm(localComm);
193        MPI_Comm_dup(localComm, &intraComm);
194
[300]195        MPI_Comm_rank(intraComm,&rank) ;
196        MPI_Comm_size(intraComm,&size) ;
197        string codesId=CXios::getin<string>("oasis_codes_id") ;
[490]198
[300]199        vector<string> splitted ;
[483]200        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
[300]201        vector<string>::iterator it ;
202
203        MPI_Comm newComm ;
204        int globalRank ;
205        MPI_Comm_rank(CXios::globalComm,&globalRank);
[490]206
[300]207        for(it=splitted.begin();it!=splitted.end();it++)
208        {
209          oasis_get_intercomm(newComm,*it) ;
210          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
211          MPI_Comm_remote_size(newComm,&size);
212          interComm.push_back(newComm) ;
213        }
[492]214              oasis_enddef() ;
[300]215      }
[490]216
[1009]217      MPI_Comm_rank(intraComm, &rank) ;
[300]218      if (rank==0) isRoot=true;
[490]219      else isRoot=false;
[492]220     
221      eventScheduler = new CEventScheduler(intraComm) ;
[300]222    }
[490]223
[300]224    void CServer::finalize(void)
225    {
[983]226
[361]227      CTimer::get("XIOS").suspend() ;
[697]228     
[492]229      delete eventScheduler ;
[655]230
231      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
232        MPI_Comm_free(&(*it));
[983]233
[992]234//      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)
235//        MPI_Comm_free(&(*it));
236
237      for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++)
[655]238        MPI_Comm_free(&(*it));
[983]239
[992]240      for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
241        MPI_Comm_free(&(*it));
242
[655]243      MPI_Comm_free(&intraComm);
244
[300]245      if (!is_MPI_Initialized)
[490]246      {
[300]247        if (CXios::usingOasis) oasis_finalize();
248        else MPI_Finalize() ;
249      }
[347]250      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
251      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
252      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
[300]253    }
[490]254
[300]255     void CServer::eventLoop(void)
256     {
257       bool stop=false ;
[490]258
[347]259       CTimer::get("XIOS server").resume() ;
[300]260       while(!stop)
261       {
[983]262
[300]263         if (isRoot)
264         {
265           listenContext();
266           if (!finished) listenFinalize() ;
267         }
268         else
269         {
270           listenRootContext();
271           if (!finished) listenRootFinalize() ;
272         }
[490]273
[300]274         contextEventLoop() ;
275         if (finished && contextList.empty()) stop=true ;
[956]276         eventScheduler->checkEvent() ;
[983]277
[300]278       }
[347]279       CTimer::get("XIOS server").suspend() ;
[300]280     }
[490]281
[300]282     void CServer::listenFinalize(void)
283     {
[992]284        list<MPI_Comm>::iterator it, itr;
[300]285        int msg ;
286        int flag ;
[490]287
[992]288        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
[300]289        {
290           MPI_Status status ;
[347]291           traceOff() ;
[300]292           MPI_Iprobe(0,0,*it,&flag,&status) ;
[347]293           traceOn() ;
[300]294           if (flag==true)
295           {
[992]296             MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
297             info(20)<<" CServer : Receive client finalize"<<endl ;
298
299             // If primary server, send finalize to secondary server pool(s)
300             for(itr=interCommRight.begin(); itr!=interCommRight.end(); itr++)
301             {
302               MPI_Send(&msg,1,MPI_INT,0,0,*itr) ;
303//               MPI_Comm_free(&(*itr));
304//               interCommRight.erase(itr) ;
305             }
306
[655]307              MPI_Comm_free(&(*it));
[992]308//              interComm.erase(it) ;
309              interCommLeft.erase(it) ;
[300]310              break ;
311            }
312         }
[490]313
[992]314        if (interCommLeft.empty())
315//        if (interComm.empty())
[300]316         {
317           int i,size ;
318           MPI_Comm_size(intraComm,&size) ;
319           MPI_Request* requests= new MPI_Request[size-1] ;
320           MPI_Status* status= new MPI_Status[size-1] ;
[490]321
[300]322           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
323           MPI_Waitall(size-1,requests,status) ;
324
325           finished=true ;
326           delete [] requests ;
327           delete [] status ;
328         }
329     }
[490]330
331
[300]332     void CServer::listenRootFinalize()
333     {
334        int flag ;
335        MPI_Status status ;
336        int msg ;
[490]337
[347]338        traceOff() ;
[300]339        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
[347]340        traceOn() ;
[300]341        if (flag==true)
342        {
343           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
344           finished=true ;
345        }
346      }
[490]347
[300]348     void CServer::listenContext(void)
349     {
[490]350
[300]351       MPI_Status status ;
352       int flag ;
353       static void* buffer ;
354       static MPI_Request request ;
355       static bool recept=false ;
356       int rank ;
357       int count ;
[490]358
[300]359       if (recept==false)
360       {
[347]361         traceOff() ;
[300]362         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
[347]363         traceOn() ;
[490]364         if (flag==true)
[300]365         {
366           rank=status.MPI_SOURCE ;
367           MPI_Get_count(&status,MPI_CHAR,&count) ;
368           buffer=new char[count] ;
369           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
[490]370           recept=true ;
[300]371         }
372       }
373       else
374       {
[347]375         traceOff() ;
[300]376         MPI_Test(&request,&flag,&status) ;
[347]377         traceOn() ;
[300]378         if (flag==true)
379         {
380           rank=status.MPI_SOURCE ;
381           MPI_Get_count(&status,MPI_CHAR,&count) ;
382           recvContextMessage(buffer,count) ;
383           delete [] buffer ;
[490]384           recept=false ;
[300]385         }
386       }
387     }
[490]388
[300]389     void CServer::recvContextMessage(void* buff,int count)
390     {
[983]391       static map<string,contextMessage> recvContextId;
392
[300]393       map<string,contextMessage>::iterator it ;
[490]394
[300]395       CBufferIn buffer(buff,count) ;
396       string id ;
397       int clientLeader ;
398       int nbMessage ;
399
400       buffer>>id>>nbMessage>>clientLeader ;
[490]401
[300]402       it=recvContextId.find(id) ;
403       if (it==recvContextId.end())
[490]404       {
[300]405         contextMessage msg={0,0} ;
406         pair<map<string,contextMessage>::iterator,bool> ret ;
407         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
408         it=ret.first ;
[490]409       }
[300]410       it->second.nbRecv+=1 ;
411       it->second.leaderRank+=clientLeader ;
[490]412
[300]413       if (it->second.nbRecv==nbMessage)
[490]414       {
[300]415         int size ;
416         MPI_Comm_size(intraComm,&size) ;
417         MPI_Request* requests= new MPI_Request[size-1] ;
418         MPI_Status* status= new MPI_Status[size-1] ;
[490]419
[300]420         for(int i=1;i<size;i++)
421         {
422            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
423         }
424         MPI_Waitall(size-1,requests,status) ;
425         registerContext(buff,count,it->second.leaderRank) ;
426
427         recvContextId.erase(it) ;
428         delete [] requests ;
429         delete [] status ;
430
431       }
[490]432     }
433
[300]434     void CServer::listenRootContext(void)
435     {
[490]436
[300]437       MPI_Status status ;
438       int flag ;
439       static void* buffer ;
440       static MPI_Request request ;
441       static bool recept=false ;
442       int rank ;
443       int count ;
444       const int root=0 ;
[490]445
[300]446       if (recept==false)
447       {
[347]448         traceOff() ;
[300]449         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
[347]450         traceOn() ;
[490]451         if (flag==true)
[300]452         {
453           MPI_Get_count(&status,MPI_CHAR,&count) ;
454           buffer=new char[count] ;
455           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
[490]456           recept=true ;
[300]457         }
458       }
459       else
460       {
461         MPI_Test(&request,&flag,&status) ;
462         if (flag==true)
463         {
464           MPI_Get_count(&status,MPI_CHAR,&count) ;
465           registerContext(buffer,count) ;
[983]466
[300]467           delete [] buffer ;
[490]468           recept=false ;
[300]469         }
470       }
[490]471     }
472
[655]473     void CServer::registerContext(void* buff, int count, int leaderRank)
[300]474     {
475       string contextId;
[655]476       CBufferIn buffer(buff, count);
477       buffer >> contextId;
[983]478       CContext* context;
[300]479
[680]480       info(20) << "CServer : Register new Context : " << contextId << endl;
[490]481
[680]482       if (contextList.find(contextId) != contextList.end())
483         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
484               << "Context '" << contextId << "' has already been registred");
[490]485
[983]486       MPI_Comm contextInterComm;
487       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextInterComm);
[490]488
[655]489       MPI_Comm inter;
[983]490       MPI_Intercomm_merge(contextInterComm,1,&inter);
[655]491       MPI_Barrier(inter);
492
[983]493       context=CContext::create(contextId);
[655]494       contextList[contextId]=context;
[983]495       context->initServer(intraComm,contextInterComm);
496       contextInterComms.push_back(contextInterComm);
[655]497
[1021]498       if (serverLevel == 1)
[983]499       {
[1009]500//         CClient::registerContext(contextId, intraComm);
[1021]501         CClient::registerContextByClienOfServer(contextId, intraComm);
[983]502       }
503
[655]504       MPI_Comm_free(&inter);
[983]505
[490]506     }
507
[300]508     void CServer::contextEventLoop(void)
509     {
510       bool finished ;
511       map<string,CContext*>::iterator it ;
[983]512
[490]513       for(it=contextList.begin();it!=contextList.end();it++)
[300]514       {
[597]515         finished=it->second->checkBuffersAndListen();
[300]516         if (finished)
517         {
518           contextList.erase(it) ;
519           break ;
520         }
521       }
522     }
[490]523
524     //! Get rank of the current process
525     int CServer::getRank()
526     {
527       return rank;
528     }
529
[523]530    /*!
531    * Open a file specified by a suffix and an extension and use it for the given file buffer.
532    * The file name will be suffix+rank+extension.
533    *
534    * \param fileName[in] protype file name
535    * \param ext [in] extension of the file
536    * \param fb [in/out] the file buffer
537    */
538    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
539    {
540      StdStringStream fileNameClient;
541      int numDigit = 0;
542      int size = 0;
[1021]543      int id;
[523]544      MPI_Comm_size(CXios::globalComm, &size);
545      while (size)
546      {
547        size /= 10;
548        ++numDigit;
549      }
[497]550
[1021]551      if (!CXios::usingServer2)
552        id = getRank();
[1009]553      else
[1021]554      {
555        if (serverLevel == 1)
556          id = getRank();
557        else
558          id = poolId;
559      }
560      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
[523]561      fb->open(fileNameClient.str().c_str(), std::ios::out);
562      if (!fb->is_open())
563        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
564              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
565    }
[490]566
[523]567    /*!
568    * \brief Open a file stream to write the info logs
569    * Open a file stream with a specific file name suffix+rank
570    * to write the info logs.
571    * \param fileName [in] protype file name
572    */
573    void CServer::openInfoStream(const StdString& fileName)
574    {
575      std::filebuf* fb = m_infoStream.rdbuf();
576      openStream(fileName, ".out", fb);
[490]577
[523]578      info.write2File(fb);
579      report.write2File(fb);
580    }
[490]581
[523]582    //! Write the info logs to standard output
583    void CServer::openInfoStream()
584    {
585      info.write2StdOut();
586      report.write2StdOut();
587    }
[490]588
[523]589    //! Close the info logs file if it opens
590    void CServer::closeInfoStream()
591    {
592      if (m_infoStream.is_open()) m_infoStream.close();
593    }
594
595    /*!
596    * \brief Open a file stream to write the error log
597    * Open a file stream with a specific file name suffix+rank
598    * to write the error log.
599    * \param fileName [in] protype file name
600    */
601    void CServer::openErrorStream(const StdString& fileName)
602    {
603      std::filebuf* fb = m_errorStream.rdbuf();
604      openStream(fileName, ".err", fb);
605
606      error.write2File(fb);
607    }
608
609    //! Write the error log to standard error output
610    void CServer::openErrorStream()
611    {
612      error.write2StdErr();
613    }
614
615    //! Close the error log file if it opens
616    void CServer::closeErrorStream()
617    {
618      if (m_errorStream.is_open()) m_errorStream.close();
619    }
[300]620}
Note: See TracBrowser for help on using the repository browser.