source: XIOS/dev/dev_olga/src/server.cpp @ 1021

Last change on this file since 1021 was 1021, checked in by oabramkina, 7 years ago

Intermeadiate version for merging with new server functionalities.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 18.2 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "client.hpp"
6#include "type.hpp"
7#include "context.hpp"
8#include "object_template.hpp"
9#include "oasis_cinterface.hpp"
10#include <boost/functional/hash.hpp>
11#include <boost/algorithm/string.hpp>
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "event_scheduler.hpp"
16
17namespace xios
18{
19    MPI_Comm CServer::intraComm ;
20    list<MPI_Comm> CServer::interCommLeft ;
21    list<MPI_Comm> CServer::interCommRight ;
22    list<MPI_Comm> CServer::interComm ;
23    std::list<MPI_Comm> CServer::contextInterComms;
24    int CServer::serverLevel = 0 ;
25    int CServer::nbPools = 0;
26    int CServer::poolId = 0;
27    int CServer::serverSize = 0;
28    bool CServer::isRoot = false ;
29    int CServer::rank = INVALID_RANK;
30    StdOFStream CServer::m_infoStream;
31    StdOFStream CServer::m_errorStream;
32    map<string,CContext*> CServer::contextList ;
33    bool CServer::finished=false ;
34    bool CServer::is_MPI_Initialized ;
35    CEventScheduler* CServer::eventScheduler = 0;
36
37//---------------------------------------------------------------
38/*!
39 * \fn void CServer::initialize(void)
40 * Creates intraComm and interComm for a server pool (primary or secondary).
41 */
42    void CServer::initialize(void)
43    {
44      int initialized ;
45      MPI_Initialized(&initialized) ;
46      if (initialized) is_MPI_Initialized=true ;
47      else is_MPI_Initialized=false ;
48
49      // Not using OASIS
50      if (!CXios::usingOasis)
51      {
52
53        if (!is_MPI_Initialized)
54        {
55          MPI_Init(NULL, NULL);
56        }
57        CTimer::get("XIOS").resume() ;
58
59        boost::hash<string> hashString ;
60//        unsigned long hashServer1 = hashString(CXios::xiosCodeIdPrm);
61//        unsigned long hashServer2 = hashString(CXios::xiosCodeIdSnd);
62//        unsigned long hashServer = (CXios::serverLevel < 2)  ? hashServer1 : hashServer2;
63        unsigned long hashServer = hashString(CXios::xiosCodeId);
64
65        unsigned long* hashAll ;
66//        unsigned long* hashAllServers ;
67
68//        int rank ;
69        int size ;
70        int myColor ;
71        int i,c ;
72        MPI_Comm newComm, serversComm;
73
74        MPI_Comm_size(CXios::globalComm, &size) ;
75        MPI_Comm_rank(CXios::globalComm, &rank);
76
77        hashAll=new unsigned long[size] ;
78        MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ;
79
80        map<unsigned long, int> colors ;
81        map<unsigned long, int> leaders ;
82        map<unsigned long, int>::iterator it ;
83
84        for(i=0,c=0;i<size;i++)
85        {
86          if (colors.find(hashAll[i])==colors.end())
87          {
88            colors[hashAll[i]]=c ;
89            leaders[hashAll[i]]=i ;
90            c++ ;
91          }
92        }
93
94        // Setting the number of secondary pools
95        myColor = colors[hashServer];
96        if (CXios::usingServer2)
97        {
98          int serverRank = rank - leaders[hashServer]; // server proc rank starting 0
99          serverSize = size - leaders[hashServer];
100          nbPools = serverSize * CXios::ratioServer2 / 100;
101          if ( serverRank < (serverSize - nbPools) )
102          {
103            serverLevel = 1;
104          }
105          else
106          {
107            serverLevel = 2;
108            poolId = serverRank - serverSize + nbPools;
109            myColor = rank;
110          }
111        }
112
113        MPI_Comm_split(CXios::globalComm, myColor, rank, &intraComm) ;
114
115        if (serverLevel == 0)
116        {
117          int clientLeader;
118          for(it=leaders.begin();it!=leaders.end();it++)
119          {
120            if (it->first!=hashServer)
121            {
122              clientLeader=it->second ;
123              int intraCommSize, intraCommRank ;
124              MPI_Comm_size(intraComm,&intraCommSize) ;
125              MPI_Comm_rank(intraComm,&intraCommRank) ;
126              info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
127                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
128
129              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
130               interCommLeft.push_back(newComm) ;
131               interComm.push_back(newComm) ;
132            }
133          }
134        }
135        else if (serverLevel == 1)
136        {
137          int clientLeader, srvPrmLeader, srvSndLeader;
138          for (it=leaders.begin();it!=leaders.end();it++)
139          {
140            if (it->first != hashServer)
141            {
142              clientLeader=it->second ;
143              int intraCommSize, intraCommRank ;
144              MPI_Comm_size(intraComm, &intraCommSize) ;
145              MPI_Comm_rank(intraComm, &intraCommRank) ;
146              info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
147                       <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
148              MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
149              interCommLeft.push_back(newComm) ;
150              interComm.push_back(newComm) ;
151            }
152            else
153              srvPrmLeader = it->second;
154          }
155
156          for (int i = 0; i < nbPools; ++i)
157          {
158            srvSndLeader = srvPrmLeader + serverSize - nbPools + i;
159//            CClient::initializeClientOnServer(rank, serversComm, srvSndLeader);
160            CClient::initializeClientOnServer(rank, intraComm, srvSndLeader);
161            interCommRight.push_back(CClient::getInterComm());
162            interComm.push_back(CClient::getInterComm());
163          }
164        } // primary server
165        else
166        {
167          int clientLeader;
168          clientLeader = leaders[hashString(CXios::xiosCodeId)];
169          int intraCommSize, intraCommRank ;
170          MPI_Comm_size(intraComm, &intraCommSize) ;
171          MPI_Comm_rank(intraComm, &intraCommRank) ;
172          info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
173                   <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
174
175          MPI_Intercomm_create(intraComm, 0, CXios::globalComm, clientLeader, 0, &newComm) ;
176          interCommLeft.push_back(newComm) ;
177          interComm.push_back(newComm) ;
178        } // secondary server
179
180        delete [] hashAll ;
181
182      }
183      // using OASIS
184      else
185      {
186//        int rank ,size;
187        int size;
188        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
189
190        CTimer::get("XIOS").resume() ;
191        MPI_Comm localComm;
192        oasis_get_localcomm(localComm);
193        MPI_Comm_dup(localComm, &intraComm);
194
195        MPI_Comm_rank(intraComm,&rank) ;
196        MPI_Comm_size(intraComm,&size) ;
197        string codesId=CXios::getin<string>("oasis_codes_id") ;
198
199        vector<string> splitted ;
200        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
201        vector<string>::iterator it ;
202
203        MPI_Comm newComm ;
204        int globalRank ;
205        MPI_Comm_rank(CXios::globalComm,&globalRank);
206
207        for(it=splitted.begin();it!=splitted.end();it++)
208        {
209          oasis_get_intercomm(newComm,*it) ;
210          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
211          MPI_Comm_remote_size(newComm,&size);
212          interComm.push_back(newComm) ;
213        }
214              oasis_enddef() ;
215      }
216
217      MPI_Comm_rank(intraComm, &rank) ;
218      if (rank==0) isRoot=true;
219      else isRoot=false;
220     
221      eventScheduler = new CEventScheduler(intraComm) ;
222    }
223
224    void CServer::finalize(void)
225    {
226
227      CTimer::get("XIOS").suspend() ;
228     
229      delete eventScheduler ;
230
231      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
232        MPI_Comm_free(&(*it));
233
234//      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); it++)
235//        MPI_Comm_free(&(*it));
236
237      for (std::list<MPI_Comm>::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++)
238        MPI_Comm_free(&(*it));
239
240      for (std::list<MPI_Comm>::iterator it = interCommRight.begin(); it != interCommRight.end(); it++)
241        MPI_Comm_free(&(*it));
242
243      MPI_Comm_free(&intraComm);
244
245      if (!is_MPI_Initialized)
246      {
247        if (CXios::usingOasis) oasis_finalize();
248        else MPI_Finalize() ;
249      }
250      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
251      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
252      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
253    }
254
255     void CServer::eventLoop(void)
256     {
257       bool stop=false ;
258
259       CTimer::get("XIOS server").resume() ;
260       while(!stop)
261       {
262
263         if (isRoot)
264         {
265           listenContext();
266           if (!finished) listenFinalize() ;
267         }
268         else
269         {
270           listenRootContext();
271           if (!finished) listenRootFinalize() ;
272         }
273
274         contextEventLoop() ;
275         if (finished && contextList.empty()) stop=true ;
276         eventScheduler->checkEvent() ;
277
278       }
279       CTimer::get("XIOS server").suspend() ;
280     }
281
282     void CServer::listenFinalize(void)
283     {
284        list<MPI_Comm>::iterator it, itr;
285        int msg ;
286        int flag ;
287
288        for(it=interCommLeft.begin();it!=interCommLeft.end();it++)
289        {
290           MPI_Status status ;
291           traceOff() ;
292           MPI_Iprobe(0,0,*it,&flag,&status) ;
293           traceOn() ;
294           if (flag==true)
295           {
296             MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
297             info(20)<<" CServer : Receive client finalize"<<endl ;
298
299             // If primary server, send finalize to secondary server pool(s)
300             for(itr=interCommRight.begin(); itr!=interCommRight.end(); itr++)
301             {
302               MPI_Send(&msg,1,MPI_INT,0,0,*itr) ;
303//               MPI_Comm_free(&(*itr));
304//               interCommRight.erase(itr) ;
305             }
306
307              MPI_Comm_free(&(*it));
308//              interComm.erase(it) ;
309              interCommLeft.erase(it) ;
310              break ;
311            }
312         }
313
314        if (interCommLeft.empty())
315//        if (interComm.empty())
316         {
317           int i,size ;
318           MPI_Comm_size(intraComm,&size) ;
319           MPI_Request* requests= new MPI_Request[size-1] ;
320           MPI_Status* status= new MPI_Status[size-1] ;
321
322           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
323           MPI_Waitall(size-1,requests,status) ;
324
325           finished=true ;
326           delete [] requests ;
327           delete [] status ;
328         }
329     }
330
331
332     void CServer::listenRootFinalize()
333     {
334        int flag ;
335        MPI_Status status ;
336        int msg ;
337
338        traceOff() ;
339        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
340        traceOn() ;
341        if (flag==true)
342        {
343           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
344           finished=true ;
345        }
346      }
347
348     void CServer::listenContext(void)
349     {
350
351       MPI_Status status ;
352       int flag ;
353       static void* buffer ;
354       static MPI_Request request ;
355       static bool recept=false ;
356       int rank ;
357       int count ;
358
359       if (recept==false)
360       {
361         traceOff() ;
362         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
363         traceOn() ;
364         if (flag==true)
365         {
366           rank=status.MPI_SOURCE ;
367           MPI_Get_count(&status,MPI_CHAR,&count) ;
368           buffer=new char[count] ;
369           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
370           recept=true ;
371         }
372       }
373       else
374       {
375         traceOff() ;
376         MPI_Test(&request,&flag,&status) ;
377         traceOn() ;
378         if (flag==true)
379         {
380           rank=status.MPI_SOURCE ;
381           MPI_Get_count(&status,MPI_CHAR,&count) ;
382           recvContextMessage(buffer,count) ;
383           delete [] buffer ;
384           recept=false ;
385         }
386       }
387     }
388
389     void CServer::recvContextMessage(void* buff,int count)
390     {
391       static map<string,contextMessage> recvContextId;
392
393       map<string,contextMessage>::iterator it ;
394
395       CBufferIn buffer(buff,count) ;
396       string id ;
397       int clientLeader ;
398       int nbMessage ;
399
400       buffer>>id>>nbMessage>>clientLeader ;
401
402       it=recvContextId.find(id) ;
403       if (it==recvContextId.end())
404       {
405         contextMessage msg={0,0} ;
406         pair<map<string,contextMessage>::iterator,bool> ret ;
407         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
408         it=ret.first ;
409       }
410       it->second.nbRecv+=1 ;
411       it->second.leaderRank+=clientLeader ;
412
413       if (it->second.nbRecv==nbMessage)
414       {
415         int size ;
416         MPI_Comm_size(intraComm,&size) ;
417         MPI_Request* requests= new MPI_Request[size-1] ;
418         MPI_Status* status= new MPI_Status[size-1] ;
419
420         for(int i=1;i<size;i++)
421         {
422            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
423         }
424         MPI_Waitall(size-1,requests,status) ;
425         registerContext(buff,count,it->second.leaderRank) ;
426
427         recvContextId.erase(it) ;
428         delete [] requests ;
429         delete [] status ;
430
431       }
432     }
433
434     void CServer::listenRootContext(void)
435     {
436
437       MPI_Status status ;
438       int flag ;
439       static void* buffer ;
440       static MPI_Request request ;
441       static bool recept=false ;
442       int rank ;
443       int count ;
444       const int root=0 ;
445
446       if (recept==false)
447       {
448         traceOff() ;
449         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
450         traceOn() ;
451         if (flag==true)
452         {
453           MPI_Get_count(&status,MPI_CHAR,&count) ;
454           buffer=new char[count] ;
455           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
456           recept=true ;
457         }
458       }
459       else
460       {
461         MPI_Test(&request,&flag,&status) ;
462         if (flag==true)
463         {
464           MPI_Get_count(&status,MPI_CHAR,&count) ;
465           registerContext(buffer,count) ;
466
467           delete [] buffer ;
468           recept=false ;
469         }
470       }
471     }
472
473     void CServer::registerContext(void* buff, int count, int leaderRank)
474     {
475       string contextId;
476       CBufferIn buffer(buff, count);
477       buffer >> contextId;
478       CContext* context;
479
480       info(20) << "CServer : Register new Context : " << contextId << endl;
481
482       if (contextList.find(contextId) != contextList.end())
483         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
484               << "Context '" << contextId << "' has already been registred");
485
486       MPI_Comm contextInterComm;
487       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextInterComm);
488
489       MPI_Comm inter;
490       MPI_Intercomm_merge(contextInterComm,1,&inter);
491       MPI_Barrier(inter);
492
493       context=CContext::create(contextId);
494       contextList[contextId]=context;
495       context->initServer(intraComm,contextInterComm);
496       contextInterComms.push_back(contextInterComm);
497
498       if (serverLevel == 1)
499       {
500//         CClient::registerContext(contextId, intraComm);
501         CClient::registerContextByClienOfServer(contextId, intraComm);
502       }
503
504       MPI_Comm_free(&inter);
505
506     }
507
508     void CServer::contextEventLoop(void)
509     {
510       bool finished ;
511       map<string,CContext*>::iterator it ;
512
513       for(it=contextList.begin();it!=contextList.end();it++)
514       {
515         finished=it->second->checkBuffersAndListen();
516         if (finished)
517         {
518           contextList.erase(it) ;
519           break ;
520         }
521       }
522     }
523
524     //! Get rank of the current process
525     int CServer::getRank()
526     {
527       return rank;
528     }
529
530    /*!
531    * Open a file specified by a suffix and an extension and use it for the given file buffer.
532    * The file name will be suffix+rank+extension.
533    *
534    * \param fileName[in] protype file name
535    * \param ext [in] extension of the file
536    * \param fb [in/out] the file buffer
537    */
538    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
539    {
540      StdStringStream fileNameClient;
541      int numDigit = 0;
542      int size = 0;
543      int id;
544      MPI_Comm_size(CXios::globalComm, &size);
545      while (size)
546      {
547        size /= 10;
548        ++numDigit;
549      }
550
551      if (!CXios::usingServer2)
552        id = getRank();
553      else
554      {
555        if (serverLevel == 1)
556          id = getRank();
557        else
558          id = poolId;
559      }
560      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext;
561      fb->open(fileNameClient.str().c_str(), std::ios::out);
562      if (!fb->is_open())
563        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
564              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
565    }
566
567    /*!
568    * \brief Open a file stream to write the info logs
569    * Open a file stream with a specific file name suffix+rank
570    * to write the info logs.
571    * \param fileName [in] protype file name
572    */
573    void CServer::openInfoStream(const StdString& fileName)
574    {
575      std::filebuf* fb = m_infoStream.rdbuf();
576      openStream(fileName, ".out", fb);
577
578      info.write2File(fb);
579      report.write2File(fb);
580    }
581
582    //! Write the info logs to standard output
583    void CServer::openInfoStream()
584    {
585      info.write2StdOut();
586      report.write2StdOut();
587    }
588
589    //! Close the info logs file if it opens
590    void CServer::closeInfoStream()
591    {
592      if (m_infoStream.is_open()) m_infoStream.close();
593    }
594
595    /*!
596    * \brief Open a file stream to write the error log
597    * Open a file stream with a specific file name suffix+rank
598    * to write the error log.
599    * \param fileName [in] protype file name
600    */
601    void CServer::openErrorStream(const StdString& fileName)
602    {
603      std::filebuf* fb = m_errorStream.rdbuf();
604      openStream(fileName, ".err", fb);
605
606      error.write2File(fb);
607    }
608
609    //! Write the error log to standard error output
610    void CServer::openErrorStream()
611    {
612      error.write2StdErr();
613    }
614
615    //! Close the error log file if it opens
616    void CServer::closeErrorStream()
617    {
618      if (m_errorStream.is_open()) m_errorStream.close();
619    }
620}
Note: See TracBrowser for help on using the repository browser.