source: XIOS/trunk/src/server.cpp @ 501

Last change on this file since 501 was 501, checked in by ymipsl, 10 years ago

Add licence copyright to all file ond directory src using the command :
svn propset -R copyright -F header_licence src

XIOS is now officialy under CeCILL licence

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 12.2 KB
Line 
1#include "globalScopeData.hpp"
2#include "xmlioserver_spl.hpp"
3#include "cxios.hpp"
4#include "server.hpp"
5#include "type.hpp"
6#include "context.hpp"
7#include "object_template.hpp"
8#include "oasis_cinterface.hpp"
9#include <boost/functional/hash.hpp>
10#include <boost/algorithm/string.hpp>
11#include "mpi.hpp"
12#include "tracer.hpp"
13#include "timer.hpp"
14#include "event_scheduler.hpp"
15
16namespace xios
17{
18    MPI_Comm CServer::intraComm ;
19    list<MPI_Comm> CServer::interComm ;
20    bool CServer::isRoot ;
21    int CServer::rank = INVALID_RANK;
22    StdOFStream CServer::m_infoStream;
23    map<string,CContext*> CServer::contextList ;
24    bool CServer::finished=false ;
25    bool CServer::is_MPI_Initialized ;
26    CEventScheduler* CServer::eventScheduler ;
27
28    void CServer::initialize(void)
29    {
30      int initialized ;
31      MPI_Initialized(&initialized) ;
32      if (initialized) is_MPI_Initialized=true ;
33      else is_MPI_Initialized=false ;
34
35      // Not using OASIS
36      if (!CXios::usingOasis)
37      {
38
39        if (!is_MPI_Initialized)
40        {
41          int argc=0;
42          char** argv=NULL;
43          MPI_Init(&argc,&argv) ;
44        }
45        CTimer::get("XIOS").resume() ;
46
47        boost::hash<string> hashString ;
48
49        unsigned long hashServer=hashString(CXios::xiosCodeId) ;
50        unsigned long* hashAll ;
51
52//        int rank ;
53        int size ;
54        int myColor ;
55        int i,c ;
56        MPI_Comm newComm ;
57
58        MPI_Comm_size(CXios::globalComm,&size) ;
59        MPI_Comm_rank(CXios::globalComm,&rank);
60        hashAll=new unsigned long[size] ;
61
62        MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
63
64        map<unsigned long, int> colors ;
65        map<unsigned long, int> leaders ;
66        map<unsigned long, int>::iterator it ;
67
68        for(i=0,c=0;i<size;i++)
69        {
70          if (colors.find(hashAll[i])==colors.end())
71          {
72            colors[hashAll[i]]=c ;
73            leaders[hashAll[i]]=i ;
74            c++ ;
75          }
76        }
77
78        myColor=colors[hashServer] ;
79        MPI_Comm_split(MPI_COMM_WORLD,myColor,rank,&intraComm) ;
80
81        int serverLeader=leaders[hashServer] ;
82        int clientLeader;
83
84         serverLeader=leaders[hashServer] ;
85         for(it=leaders.begin();it!=leaders.end();it++)
86         {
87           if (it->first!=hashServer)
88           {
89             clientLeader=it->second ;
90             int intraCommSize, intraCommRank ;
91             MPI_Comm_size(intraComm,&intraCommSize) ;
92             MPI_Comm_rank(intraComm,&intraCommRank) ;
93             info(50)<<"intercommCreate::server "<<rank<<" intraCommSize : "<<intraCommSize
94                     <<" intraCommRank :"<<intraCommRank<<"  clientLeader "<< clientLeader<<endl ;
95
96             MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ;
97             interComm.push_back(newComm) ;
98           }
99         }
100
101         delete [] hashAll ;
102      }
103      // using OASIS
104      else
105      {
106//        int rank ,size;
107        int size;
108        if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId);
109
110        CTimer::get("XIOS").resume() ;
111        oasis_get_localcomm(intraComm) ;
112        MPI_Comm_rank(intraComm,&rank) ;
113        MPI_Comm_size(intraComm,&size) ;
114        string codesId=CXios::getin<string>("oasis_codes_id") ;
115
116        vector<string> splitted ;
117        boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ;
118        vector<string>::iterator it ;
119
120        MPI_Comm newComm ;
121        int globalRank ;
122        MPI_Comm_rank(CXios::globalComm,&globalRank);
123
124        for(it=splitted.begin();it!=splitted.end();it++)
125        {
126          oasis_get_intercomm(newComm,*it) ;
127          if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ;
128          MPI_Comm_remote_size(newComm,&size);
129          interComm.push_back(newComm) ;
130        }
131              oasis_enddef() ;
132      }
133
134//      int rank;
135      MPI_Comm_rank(intraComm,&rank) ;
136      if (rank==0) isRoot=true;
137      else isRoot=false;
138     
139      eventScheduler = new CEventScheduler(intraComm) ;
140    }
141
142    void CServer::finalize(void)
143    {
144      CTimer::get("XIOS").suspend() ;
145     
146      delete eventScheduler ;
147     
148      if (!is_MPI_Initialized)
149      {
150        if (CXios::usingOasis) oasis_finalize();
151        else MPI_Finalize() ;
152      }
153      report(0)<<"Performance report : Time spent for XIOS : "<<CTimer::get("XIOS server").getCumulatedTime()<<endl  ;
154      report(0)<<"Performance report : Time spent in processing events : "<<CTimer::get("Process events").getCumulatedTime()<<endl  ;
155      report(0)<<"Performance report : Ratio : "<<CTimer::get("Process events").getCumulatedTime()/CTimer::get("XIOS server").getCumulatedTime()*100.<<"%"<<endl  ;
156    }
157
158     void CServer::eventLoop(void)
159     {
160       bool stop=false ;
161
162       CTimer::get("XIOS server").resume() ;
163       while(!stop)
164       {
165         if (isRoot)
166         {
167           listenContext();
168           if (!finished) listenFinalize() ;
169         }
170         else
171         {
172           listenRootContext();
173           if (!finished) listenRootFinalize() ;
174         }
175
176         contextEventLoop() ;
177         if (finished && contextList.empty()) stop=true ;
178         if (! CXios::isServer) eventScheduler->checkEvent() ;
179       }
180       CTimer::get("XIOS server").suspend() ;
181     }
182
183     void CServer::listenFinalize(void)
184     {
185        list<MPI_Comm>::iterator it;
186        int msg ;
187        int flag ;
188
189        for(it=interComm.begin();it!=interComm.end();it++)
190        {
191           MPI_Status status ;
192           traceOff() ;
193           MPI_Iprobe(0,0,*it,&flag,&status) ;
194           traceOn() ;
195           if (flag==true)
196           {
197              MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ;
198              info(20)<<" CServer : Receive client finalize"<<endl ;
199              interComm.erase(it) ;
200              break ;
201            }
202         }
203
204         if (interComm.empty())
205         {
206           int i,size ;
207           MPI_Comm_size(intraComm,&size) ;
208           MPI_Request* requests= new MPI_Request[size-1] ;
209           MPI_Status* status= new MPI_Status[size-1] ;
210
211           for(int i=1;i<size;i++) MPI_Isend(&msg,1,MPI_INT,i,4,intraComm,&requests[i-1]) ;
212           MPI_Waitall(size-1,requests,status) ;
213
214           finished=true ;
215           delete [] requests ;
216           delete [] status ;
217         }
218     }
219
220
221     void CServer::listenRootFinalize()
222     {
223        int flag ;
224        MPI_Status status ;
225        int msg ;
226
227        traceOff() ;
228        MPI_Iprobe(0,4,intraComm, &flag, &status) ;
229        traceOn() ;
230        if (flag==true)
231        {
232           MPI_Recv(&msg,1,MPI_INT,0,4,intraComm,&status) ;
233           finished=true ;
234        }
235      }
236
237     void CServer::listenContext(void)
238     {
239
240       MPI_Status status ;
241       int flag ;
242       static void* buffer ;
243       static MPI_Request request ;
244       static bool recept=false ;
245       int rank ;
246       int count ;
247
248       if (recept==false)
249       {
250         traceOff() ;
251         MPI_Iprobe(MPI_ANY_SOURCE,1,CXios::globalComm, &flag, &status) ;
252         traceOn() ;
253         if (flag==true)
254         {
255           rank=status.MPI_SOURCE ;
256           MPI_Get_count(&status,MPI_CHAR,&count) ;
257           buffer=new char[count] ;
258           MPI_Irecv(buffer,count,MPI_CHAR,rank,1,CXios::globalComm,&request) ;
259           recept=true ;
260         }
261       }
262       else
263       {
264         traceOff() ;
265         MPI_Test(&request,&flag,&status) ;
266         traceOn() ;
267         if (flag==true)
268         {
269           rank=status.MPI_SOURCE ;
270           MPI_Get_count(&status,MPI_CHAR,&count) ;
271           recvContextMessage(buffer,count) ;
272           delete [] buffer ;
273           recept=false ;
274         }
275       }
276     }
277
278     void CServer::recvContextMessage(void* buff,int count)
279     {
280       static map<string,contextMessage> recvContextId ;
281       map<string,contextMessage>::iterator it ;
282
283       CBufferIn buffer(buff,count) ;
284       string id ;
285       int clientLeader ;
286       int nbMessage ;
287
288       buffer>>id>>nbMessage>>clientLeader ;
289
290       it=recvContextId.find(id) ;
291       if (it==recvContextId.end())
292       {
293         contextMessage msg={0,0} ;
294         pair<map<string,contextMessage>::iterator,bool> ret ;
295         ret=recvContextId.insert(pair<string,contextMessage>(id,msg)) ;
296         it=ret.first ;
297       }
298       it->second.nbRecv+=1 ;
299       it->second.leaderRank+=clientLeader ;
300
301       if (it->second.nbRecv==nbMessage)
302       {
303         int size ;
304         MPI_Comm_size(intraComm,&size) ;
305         MPI_Request* requests= new MPI_Request[size-1] ;
306         MPI_Status* status= new MPI_Status[size-1] ;
307
308         for(int i=1;i<size;i++)
309         {
310            MPI_Isend(buff,count,MPI_CHAR,i,2,intraComm,&requests[i-1]) ;
311         }
312         MPI_Waitall(size-1,requests,status) ;
313         registerContext(buff,count,it->second.leaderRank) ;
314
315         recvContextId.erase(it) ;
316         delete [] requests ;
317         delete [] status ;
318
319       }
320     }
321
322     void CServer::listenRootContext(void)
323     {
324
325       MPI_Status status ;
326       int flag ;
327       static void* buffer ;
328       static MPI_Request request ;
329       static bool recept=false ;
330       int rank ;
331       int count ;
332       const int root=0 ;
333
334       if (recept==false)
335       {
336         traceOff() ;
337         MPI_Iprobe(root,2,intraComm, &flag, &status) ;
338         traceOn() ;
339         if (flag==true)
340         {
341           MPI_Get_count(&status,MPI_CHAR,&count) ;
342           buffer=new char[count] ;
343           MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ;
344           recept=true ;
345         }
346       }
347       else
348       {
349         MPI_Test(&request,&flag,&status) ;
350         if (flag==true)
351         {
352           MPI_Get_count(&status,MPI_CHAR,&count) ;
353           registerContext(buffer,count) ;
354           delete [] buffer ;
355           recept=false ;
356         }
357       }
358     }
359
360
361
362     void CServer::registerContext(void* buff,int count, int leaderRank)
363     {
364
365       string contextId;
366       CBufferIn buffer(buff,count) ;
367
368       buffer>>contextId ;
369       MPI_Comm contextIntercomm ;
370       MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ;
371
372       info(20)<<"CServer : Register new Context : "<<contextId<<endl  ;
373       MPI_Comm inter ;
374       MPI_Intercomm_merge(contextIntercomm,1,&inter) ;
375       MPI_Barrier(inter) ;
376       if (contextList.find(contextId)!=contextList.end())
377        ERROR("void CServer::registerContext(void* buff,int count, int leaderRank)",
378              <<"Context has already been registred") ;
379
380      CContext* context=CContext::create(contextId) ;
381      contextList[contextId]=context ;
382      context->initServer(intraComm,contextIntercomm) ;
383
384     }
385
386
387     void CServer::contextEventLoop(void)
388     {
389       bool finished ;
390       map<string,CContext*>::iterator it ;
391       for(it=contextList.begin();it!=contextList.end();it++)
392       {
393         finished=it->second->eventLoop() ;
394         if (finished)
395         {
396           contextList.erase(it) ;
397           break ;
398         }
399       }
400
401     }
402
403     //! Get rank of the current process
404     int CServer::getRank()
405     {
406       return rank;
407     }
408
409     /*!
410      * \brief Open file stream to write in
411      *   Opening a file stream with a specific file name suffix-server+rank
412      * \param [in] protype file name
413     */
414     void CServer::openInfoStream(const StdString& fileName)
415     {
416       std::filebuf* fb = m_infoStream.rdbuf();
417       StdStringStream fileNameServer;
418       int numDigit = 0;
419       int size = 0;
420       MPI_Comm_size(CXios::globalComm, &size);
421       while (size)
422       {
423         size /= 10;
424         ++numDigit;
425       }
426
427       fileNameServer << fileName <<"_" << std::setfill('0') << std::setw(numDigit) << getRank() << ".out";
428       fb->open(fileNameServer.str().c_str(), std::ios::out);
429       if (!fb->is_open())
430       ERROR("void CServer::openInfoStream(const StdString& fileName)",
431            <<endl<< "Can not open <"<<fileNameServer<<"> file to write" );
432
433       info.write2File(fb);
434       report.write2File(fb);
435     }
436
437     //! Open stream for standard output
438     void CServer::openInfoStream()
439     {
440       info.write2StdOut();
441       report.write2StdOut();
442     }
443
444     //! Close opening stream
445     void CServer::closeInfoStream()
446     {
447       if (m_infoStream.is_open()) m_infoStream.close();
448     }
449
450}
Note: See TracBrowser for help on using the repository browser.