source: XIOS/dev/branch_yushan_merged/src/server.cpp @ 1146

Last change on this file since 1146 was 1146, checked in by yushan, 7 years ago

save modif

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 14.2 KB
Line 
1#include "globalScopeData.hpp"
2#include "xios_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "type.hpp"
6#include "context.hpp"
7#include "object_template.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11#include "tracer.hpp"
12#include "timer.hpp"
13#include "event_scheduler.hpp"
14
15namespace xios
16{
17    MPI_Comm CServer::intraComm ;
18    list<MPI_Comm> CServer::interComm ;
19    std::list<MPI_Comm> CServer::contextInterComms;
20    bool CServer::isRoot ;
21    int CServer::rank = INVALID_RANK;
22    StdOFStream CServer::m_infoStream;
23    StdOFStream CServer::m_errorStream;
24    map<string,CContext*> CServer::contextList ;
25    bool CServer::finished=false ;
26    bool CServer::is_MPI_Initialized ;
27
28   
29    CEventScheduler* CServer::eventScheduler = 0;
30   
31    void CServer::initialize(void)
32    {
33      // Not using OASIS
34      if (!CXios::usingOasis)
35      {
36
37        CTimer::get("XIOS").resume() ;
38
39        boost::hash<string> hashString ;
40
41        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
42        unsigned long* hashAll ;
43
44
45        int size ;
46        int myColor ;
47        int i,c ;
48        MPI_Comm newComm ;
49
50        MPI_Comm_size(CXios::globalComm,&size) ;
51       
52        //size = CXios::globalComm.ep_comm_ptr->size_rank_info[0].second;
53        printf("global size = %d, size= %d\n", CXios::globalComm.ep_comm_ptr->size_rank_info[0].second, size);
54       
55        MPI_Comm_rank(CXios::globalComm,&rank);
56        hashAll=new unsigned long[size] ;
57
58        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
59
60        map<unsigned long, int> colors ;
61        map<unsigned long, int> leaders ;
62        map<unsigned long, int>::iterator it ;
63
64        for(i=0,c=0;i<size;i++)
65        {
66          if (colors.find(hashAll[i])==colors.end())
67          {
68            colors[hashAll[i]]=c ;
69            leaders[hashAll[i]]=i ;
70            c++ ;
71          }
72        }
73
74        myColor=colors[hashServer] ;
75
76
77        MPI_Comm_split(CXios::globalComm,myColor,rank,&intraComm) ;
78
79       
80        int serverLeader=leaders[hashServer] ;
81        int clientLeader;
82
83         serverLeader=leaders[hashServer] ;
84         for(it=leaders.begin();it!=leaders.end();++it)
85         {
86           if (it->first!=hashServer)
87           {
88             clientLeader=it->second ;
89             int intraCommSize, intraCommRank ;
90             MPI_Comm_size(intraComm,&intraCommSize) ;
91             MPI_Comm_rank(intraComm,&intraCommRank) ;
92             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
93                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
94
95             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
96             interComm.push_back(newComm) ;
97           }
98         }
99
100         delete [] hashAll ;
101      }
102      // using OASIS
103      else
104      {
105        int size;
106        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
107
108        CTimer::get("XIOS").resume() ;
109        MPI_Comm localComm;
110        oasis_get_localcomm(localComm);
111        MPI_Comm_dup(localComm, &intraComm);
112
113        MPI_Comm_rank(intraComm,&rank) ;
114        MPI_Comm_size(intraComm,&size) ;
115        string codesId=CXios::getin<string>("oasis_codes_id") ;
116
117        vector<string> splitted ;
118        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
119        vector<string>::iterator it ;
120
121        MPI_Comm newComm ;
122        int globalRank ;
123        MPI_Comm_rank(CXios::globalComm,&globalRank);
124
125        for(it=splitted.begin();it!=splitted.end();it++)
126        {
127          oasis_get_intercomm(newComm,*it) ;
128          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
129          MPI_Comm_remote_size(newComm,&size);
130          interComm.push_back(newComm) ;
131        }
132              oasis_enddef() ;
133      }
134
135      MPI_Comm_rank(intraComm,&rank) ;
136      if (rank==0) isRoot=true;
137      else isRoot=false;
138     
139      eventScheduler = new CEventScheduler(intraComm) ;
140    }
141
142    void CServer::finalize(void)
143    {
144      CTimer::get("XIOS").suspend() ;
145     
146      delete eventScheduler ;
147
148      for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); ++it)
149        MPI_Comm_free(&(*it));
150      for (std::list<MPI_Comm>::iterator it = interComm.begin(); it != interComm.end(); ++it)
151        MPI_Comm_free(&(*it));
152
153      MPI_Comm_free(&intraComm);
154
155      if (!is_MPI_Initialized)
156      {
157        if (CXios::usingOasis) oasis_finalize();
158        //else  {MPI_Finalize() ;}
159      }
160
161     
162      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
163      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
164      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
165    }
166
167     void CServer::eventLoop(void)
168     {
169       bool stop=false ;
170
171       CTimer::get("XIOS server").resume() ;
172       while(!stop)
173       {
174         if (isRoot)
175         {
176           listenContext();
177           if (!finished) listenFinalize() ;
178         }
179         else
180         {
181           listenRootContext();
182           if (!finished) 
183           {
184             listenRootFinalize() ;
185           }
186         }
187
188         contextEventLoop() ;
189         if (finished && contextList.empty()) stop=true ;
190         
191         eventScheduler->checkEvent() ;
192       }
193       
194       
195       CTimer::get("XIOS server").suspend() ;
196     }
197
198     void CServer::listenFinalize(void)
199     {
200        list<MPI_Comm>::iterator it;
201        int msg ;
202        int flag ;
203
204        for(it=interComm.begin();it!=interComm.end();++it)
205        {
206           MPI_Status status ;
207           traceOff() ;
208           MPI_Iprobe(0,0,*it,&flag,&status) ;
209           traceOn() ;
210           if (flag==true)
211           {
212              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
213              info(20)<<" CServer : Receive client finalize"<<endl ;
214
215              MPI_Comm_free(&(*it));
216              interComm.erase(it) ;
217              break ;
218            }
219         }
220
221         if (interComm.empty())
222         {
223           int i,size ;
224           MPI_Comm_size(intraComm,&size) ;
225           MPI_Request* requests= new MPI_Request[size-1] ;
226           MPI_Status* status= new MPI_Status[size-1] ;
227
228           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
229           MPI_Waitall(size-1,requests,status) ;
230
231           finished=true ;
232           delete [] requests ;
233           delete [] status ;
234         }
235     }
236
237
238     void CServer::listenRootFinalize()
239     {
240        int flag ;
241        MPI_Status status ;
242        int msg ;
243
244        traceOff() ;
245        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
246        traceOn() ;
247        if (flag==true)
248        {
249           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
250           finished=true ;
251        }
252      }
253
254     void CServer::listenContext(void)
255     {
256
257       MPI_Status status ;
258       int flag ;
259       static char* buffer ;
260       static MPI_Request request ;
261       static bool recept=false ;
262       int rank ;
263       int count ;
264
265       if (recept==false)
266       {
267         traceOff() ;
268         #ifdef _usingEP
269         MPI_Iprobe(-1,1,CXios::globalComm, &flag, &status) ;
270         #else
271         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
272         #endif
273         traceOn() ;
274         
275         if (flag==true)
276         {
277           #ifdef _usingMPI
278           rank=status.MPI_SOURCE ;
279           #elif _usingEP
280           rank= status.ep_src ;
281           #endif
282           MPI_Get_count(&status,MPI_CHAR,&count) ;
283           buffer=new char[count] ;
284           MPI_Irecv((void*)buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
285           recept=true ;
286         }
287       }
288       else
289       {
290         traceOff() ;
291         MPI_Test(&request,&flag,&status) ;
292         traceOn() ;
293         if (flag==true)
294         {
295           #ifdef _usingMPI
296           rank=status.MPI_SOURCE ;
297           #elif _usingEP
298           rank= status.ep_src ;
299           #endif
300           MPI_Get_count(&status,MPI_CHAR,&count) ;
301           recvContextMessage((void*)buffer,count) ;
302           delete [] buffer ;
303           recept=false ;
304         }
305       }
306     }
307
308     void CServer::recvContextMessage(void* buff,int count)
309     {
310       static map<string,contextMessage> recvContextId ;
311       map<string,contextMessage>::iterator it ;
312
313       CBufferIn buffer(buff,count) ;
314       string id ;
315       int clientLeader ;
316       int nbMessage ;
317
318       buffer>>id>>nbMessage>>clientLeader ;
319
320       it=recvContextId.find(id) ;
321       if (it==recvContextId.end())
322       {
323         contextMessage msg={0,0} ;
324         pair<map<string,contextMessage>::iterator,bool> ret ;
325         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
326         it=ret.first ;
327       }
328       it->second.nbRecv+=1 ;
329       it->second.leaderRank+=clientLeader ;
330
331       if (it->second.nbRecv==nbMessage)
332       {
333         int size ;
334         MPI_Comm_size(intraComm,&size) ;
335         MPI_Request* requests= new MPI_Request[size-1] ;
336         MPI_Status* status= new MPI_Status[size-1] ;
337
338         for(int i=1;i<size;i++)
339         {
340            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
341         }
342         
343         MPI_Waitall(size-1,requests,status) ;
344         registerContext(buff,count,it->second.leaderRank) ;
345
346         recvContextId.erase(it) ;
347         delete [] requests ;
348         delete [] status ;
349
350       }
351     }
352
353     void CServer::listenRootContext(void)
354     {
355
356       MPI_Status status ;
357       int flag ;
358       static char* buffer ;
359       static MPI_Request request ;
360       static bool recept=false ;
361       int rank ;
362       int count ;
363       const int root=0 ;
364
365       if (recept==false)
366       {
367         traceOff() ;
368         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
369         traceOn() ;
370         if (flag==true)
371         {
372           MPI_Get_count(&status,MPI_CHAR,&count) ;
373           buffer=new char[count] ;
374           MPI_Irecv((void*)buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
375           recept=true ;
376         }
377       }
378       else
379       {
380         MPI_Test(&request,&flag,&status) ;
381         if (flag==true)
382         {
383           MPI_Get_count(&status,MPI_CHAR,&count) ;
384           registerContext((void*)buffer,count) ;
385           delete [] buffer ;
386           recept=false ;
387         }
388       }
389     }
390
391     void CServer::registerContext(void* buff, int count, int leaderRank)
392     {
393       string contextId;
394       CBufferIn buffer(buff, count);
395       buffer >> contextId;
396
397       info(20) << "CServer : Register new Context : " << contextId << endl;
398
399       if (contextList.find(contextId) != contextList.end())
400         ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)",
401               << "Context '" << contextId << "' has already been registred");
402
403       MPI_Comm contextIntercomm;
404       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm);
405
406       MPI_Comm inter;
407       MPI_Intercomm_merge(contextIntercomm,1,&inter);
408       MPI_Barrier(inter);
409
410       CContext* context=CContext::create(contextId);
411       contextList[contextId]=context;
412       context->initServer(intraComm,contextIntercomm);
413
414       contextInterComms.push_back(contextIntercomm);
415       MPI_Comm_free(&inter);
416     }
417
418     void CServer::contextEventLoop(void)
419     {
420       bool finished ;
421       map<string,CContext*>::iterator it ;
422       for(it=contextList.begin();it!=contextList.end();++it)
423       {
424         finished=it->second->checkBuffersAndListen();
425         if (finished)
426         {
427           contextList.erase(it) ;
428           break ;
429         }
430       }
431
432     }
433
434     //! Get rank of the current process
435     int CServer::getRank()
436     {
437       return rank;
438     }
439
440    /*!
441    * Open a file specified by a suffix and an extension and use it for the given file buffer.
442    * The file name will be suffix+rank+extension.
443    *
444    * \param fileName[in] protype file name
445    * \param ext [in] extension of the file
446    * \param fb [in/out] the file buffer
447    */
448    void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
449    {
450      StdStringStream fileNameClient;
451      int numDigit = 0;
452      int size = 0;
453      MPI_Comm_size(CXios::globalComm, &size);
454      while (size)
455      {
456        size /= 10;
457        ++numDigit;
458      }
459
460      fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
461      fb->open(fileNameClient.str().c_str(), std::ios::out);
462      if (!fb->is_open())
463        ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
464              << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s).");
465    }
466
467    /*!
468    * \brief Open a file stream to write the info logs
469    * Open a file stream with a specific file name suffix+rank
470    * to write the info logs.
471    * \param fileName [in] protype file name
472    */
473    void CServer::openInfoStream(const StdString& fileName)
474    {
475      std::filebuf* fb = m_infoStream.rdbuf();
476      openStream(fileName, ".out", fb);
477
478      info.write2File(fb);
479      report.write2File(fb);
480    }
481
482    //! Write the info logs to standard output
483    void CServer::openInfoStream()
484    {
485      info.write2StdOut();
486      report.write2StdOut();
487    }
488
489    //! Close the info logs file if it opens
490    void CServer::closeInfoStream()
491    {
492      if (m_infoStream.is_open()) m_infoStream.close();
493    }
494
495    /*!
496    * \brief Open a file stream to write the error log
497    * Open a file stream with a specific file name suffix+rank
498    * to write the error log.
499    * \param fileName [in] protype file name
500    */
501    void CServer::openErrorStream(const StdString& fileName)
502    {
503      std::filebuf* fb = m_errorStream.rdbuf();
504      openStream(fileName, ".err", fb);
505
506      error.write2File(fb);
507    }
508
509    //! Write the error log to standard error output
510    void CServer::openErrorStream()
511    {
512      error.write2StdErr();
513    }
514
515    //! Close the error log file if it opens
516    void CServer::closeErrorStream()
517    {
518      if (m_errorStream.is_open()) m_errorStream.close();
519    }
520}
Note: See TracBrowser for help on using the repository browser.