XIOS  1.0
Xml I/O Server
 Tout Classes Espaces de nommage Fichiers Fonctions Variables Définitions de type Énumérations Valeurs énumérées Amis Macros
client.cpp
Aller à la documentation de ce fichier.
1 #include "globalScopeData.hpp"
2 #include "xios_spl.hpp"
3 #include "cxios.hpp"
4 #include "client.hpp"
5 #include <boost/functional/hash.hpp>
6 #include "type.hpp"
7 #include "context.hpp"
8 #include "context_client.hpp"
9 #include "oasis_cinterface.hpp"
10 #include "mpi.hpp"
11 #include "timer.hpp"
12 #include "buffer_client.hpp"
13 #include "string_tools.hpp"
14 
15 namespace xios
16 {
17 
18  MPI_Comm CClient::intraComm ;
19  MPI_Comm CClient::interComm ;
20  std::list<MPI_Comm> CClient::contextInterComms;
26  MPI_Comm& CClient::getInterComm(void) { return (interComm); }
27 
29 
37  void CClient::initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm)
38  {
39  int initialized ;
40  MPI_Initialized(&initialized) ;
41  if (initialized) is_MPI_Initialized=true ;
42  else is_MPI_Initialized=false ;
43  int rank ;
44 
45 // don't use OASIS
46  if (!CXios::usingOasis)
47  {
48 // localComm isn't given
49  if (localComm == MPI_COMM_NULL)
50  {
51  if (!is_MPI_Initialized)
52  {
53  MPI_Init(NULL, NULL);
54  }
55  CTimer::get("XIOS").resume() ;
56  CTimer::get("XIOS init/finalize").resume() ;
57  boost::hash<string> hashString ;
58 
59  unsigned long hashClient=hashString(codeId) ;
60  unsigned long hashServer=hashString(CXios::xiosCodeId) ;
61  unsigned long* hashAll ;
62  int size ;
63  int myColor ;
64  int i,c ;
65  MPI_Comm newComm ;
66 
67  MPI_Comm_size(CXios::globalComm,&size) ;
68  MPI_Comm_rank(CXios::globalComm,&rank_);
69 
70  hashAll=new unsigned long[size] ;
71 
72  MPI_Allgather(&hashClient,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ;
73 
74  map<unsigned long, int> colors ;
75  map<unsigned long, int> leaders ;
76 
77  for(i=0,c=0;i<size;i++)
78  {
79  if (colors.find(hashAll[i])==colors.end())
80  {
81  colors[hashAll[i]] =c ;
82  leaders[hashAll[i]]=i ;
83  c++ ;
84  }
85  }
86 
87  // Verify whether we are on server mode or not
89  for (i=0; i < size; ++i)
90  {
91  if (hashServer == hashAll[i])
92  {
94  break;
95  }
96  }
97 
98  myColor=colors[hashClient];
99  MPI_Comm_split(CXios::globalComm,myColor,rank_,&intraComm) ;
100 
101  if (CXios::usingServer)
102  {
103  int clientLeader=leaders[hashClient] ;
104  serverLeader=leaders[hashServer] ;
105  int intraCommSize, intraCommRank ;
106  MPI_Comm_size(intraComm,&intraCommSize) ;
107  MPI_Comm_rank(intraComm,&intraCommRank) ;
108  info(50)<<"intercommCreate::client "<<rank_<<" intraCommSize : "<<intraCommSize
109  <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< serverLeader<<endl ;
110  MPI_Intercomm_create(intraComm, 0, CXios::globalComm, serverLeader, 0, &interComm) ;
111  //rank_ = intraCommRank;
112  }
113  else
114  {
115  MPI_Comm_dup(intraComm,&interComm) ;
116  }
117  delete [] hashAll ;
118  }
119  // localComm argument is given
120  else
121  {
122  if (CXios::usingServer)
123  {
124  //ERROR("void CClient::initialize(const string& codeId,MPI_Comm& localComm,MPI_Comm& returnComm)", << " giving a local communictor is not compatible with using server mode") ;
125  }
126  else
127  {
128  MPI_Comm_dup(localComm,&intraComm) ;
129  MPI_Comm_dup(intraComm,&interComm) ;
130  }
131  }
132  }
133  // using OASIS
134  else
135  {
136  // localComm isn't given
137  if (localComm == MPI_COMM_NULL)
138  {
139  if (!is_MPI_Initialized) oasis_init(codeId) ;
140  oasis_get_localcomm(localComm) ;
141  }
142  MPI_Comm_dup(localComm,&intraComm) ;
143 
144  CTimer::get("XIOS").resume() ;
145  CTimer::get("XIOS init/finalize").resume() ;
146 
147  if (CXios::usingServer)
148  {
149  MPI_Status status ;
150  MPI_Comm_rank(intraComm,&rank_) ;
151 
153  if (rank_==0) MPI_Recv(&serverLeader,1, MPI_INT, 0, 0, interComm, &status) ;
154  MPI_Bcast(&serverLeader,1,MPI_INT,0,intraComm) ;
155  }
156  else MPI_Comm_dup(intraComm,&interComm) ;
157  }
158 
159  MPI_Comm_dup(intraComm,&returnComm) ;
160  }
161 
163 
170  void CClient::registerContext(const string& id, MPI_Comm contextComm)
171  {
173  CContext* context=CContext::create(id);
174  StdString idServer(id);
175  idServer += "_server";
176 
177  if (CXios::isServer && !context->hasServer)
178  // Attached mode
179  {
180  MPI_Comm contextInterComm ;
181  MPI_Comm_dup(contextComm,&contextInterComm) ;
182  CContext* contextServer = CContext::create(idServer);
183 
184  // Firstly, initialize context on client side
185  context->initClient(contextComm,contextInterComm, contextServer);
186 
187  // Secondly, initialize context on server side
188  contextServer->initServer(contextComm,contextInterComm, context);
189 
190  // Finally, we should return current context to context client
192 
193  contextInterComms.push_back(contextInterComm);
194  }
195  else
196  {
197  int size,rank,globalRank ;
198  size_t message_size ;
199  int leaderRank ;
200  MPI_Comm contextInterComm ;
201 
202  MPI_Comm_size(contextComm,&size) ;
203  MPI_Comm_rank(contextComm,&rank) ;
204  MPI_Comm_rank(CXios::globalComm,&globalRank) ;
205  if (rank!=0) globalRank=0 ;
206 
207  CMessage msg ;
208  msg<<idServer<<size<<globalRank ;
209 // msg<<id<<size<<globalRank ;
210 
211  int messageSize=msg.size() ;
212  char * buff = new char[messageSize] ;
213  CBufferOut buffer((void*)buff,messageSize) ;
214  buffer<<msg ;
215 
216  MPI_Send((void*)buff,buffer.count(),MPI_CHAR,serverLeader,1,CXios::globalComm) ;
217 
218  MPI_Intercomm_create(contextComm,0,CXios::globalComm,serverLeader,10+globalRank,&contextInterComm) ;
219  info(10)<<"Register new Context : "<<id<<endl ;
220  MPI_Comm inter ;
221  MPI_Intercomm_merge(contextInterComm,0,&inter) ;
222  MPI_Barrier(inter) ;
223 
224  context->initClient(contextComm,contextInterComm) ;
225 
226  contextInterComms.push_back(contextInterComm);
227  MPI_Comm_free(&inter);
228  delete [] buff ;
229 
230  }
231  }
232 
239  {
240  bool oasisEnddef=CXios::getin<bool>("call_oasis_enddef",true) ;
241  if (!oasisEnddef) ERROR("void CClient::callOasisEnddef(void)", <<"Function xios_oasis_enddef called but variable <call_oasis_enddef> is set to false."<<endl
242  <<"Variable <call_oasis_enddef> must be set to true"<<endl) ;
243  if (CXios::isServer)
244  // Attached mode
245  {
246  // nothing to do
247  }
248  else
249  {
250  int rank ;
251  int msg=0 ;
252 
253  MPI_Comm_rank(intraComm,&rank) ;
254  if (rank==0)
255  {
256  MPI_Send(&msg,1,MPI_INT,0,5,interComm) ; // tags oasis_endded = 5
257  }
258 
259  }
260  }
261 
262 
263  void CClient::finalize(void)
264  {
265  int rank ;
266  int msg=0 ;
267 
268  MPI_Comm_rank(intraComm,&rank) ;
269 
270  if (!CXios::isServer)
271  {
272  MPI_Comm_rank(intraComm,&rank) ;
273  if (rank==0)
274  {
275  MPI_Send(&msg,1,MPI_INT,0,0,interComm) ;
276  }
277  }
278 
279  for (std::list<MPI_Comm>::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++)
280  MPI_Comm_free(&(*it));
281  MPI_Comm_free(&interComm);
282  MPI_Comm_free(&intraComm);
283 
284  CTimer::get("XIOS init/finalize").suspend() ;
285  CTimer::get("XIOS").suspend() ;
286 
287  if (!is_MPI_Initialized)
288  {
290  else MPI_Finalize() ;
291  }
292 
293  info(20) << "Client side context is finalized"<<endl ;
294  report(0) <<" Performance report : Whole time from XIOS init and finalize: "<< CTimer::get("XIOS init/finalize").getCumulatedTime()<<" s"<<endl ;
295  report(0) <<" Performance report : total time spent for XIOS : "<< CTimer::get("XIOS").getCumulatedTime()<<" s"<<endl ;
296  report(0)<< " Performance report : time spent for waiting free buffer : "<< CTimer::get("Blocking time").getCumulatedTime()<<" s"<<endl ;
297  report(0)<< " Performance report : Ratio : "<< CTimer::get("Blocking time").getCumulatedTime()/CTimer::get("XIOS init/finalize").getCumulatedTime()*100.<<" %"<<endl ;
298  report(0)<< " Performance report : This ratio must be close to zero. Otherwise it may be usefull to increase buffer size or numbers of server"<<endl ;
299 // report(0)<< " Memory report : Current buffer_size : "<<CXios::bufferSize<<endl ;
300  report(0)<< " Memory report : Minimum buffer size required : " << CClientBuffer::maxRequestSize << " bytes" << endl ;
301  report(0)<< " Memory report : increasing it by a factor will increase performance, depending of the volume of data wrote in file at each time step of the file"<<endl ;
302  report(100)<<CTimer::getAllCumulatedTime()<<endl ;
303  }
304 
309  {
310  return rank_;
311  }
312 
321  void CClient::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)
322  {
323  StdStringStream fileNameClient;
324  int numDigit = 0;
325  int size = 0;
326  int rank;
327  MPI_Comm_size(CXios::globalComm, &size);
328  while (size)
329  {
330  size /= 10;
331  ++numDigit;
332  }
333 
334  if (CXios::usingOasis)
335  {
336  MPI_Comm_rank(CXios::globalComm,&rank);
337  fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << rank << ext;
338  }
339  else
340  fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << getRank() << ext;
341 
342 
343  fb->open(fileNameClient.str().c_str(), std::ios::out);
344  if (!fb->is_open())
345  ERROR("void CClient::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)",
346  << std::endl << "Can not open <" << fileNameClient.str() << "> file to write the client log(s).");
347  }
348 
355  void CClient::openInfoStream(const StdString& fileName)
356  {
357  std::filebuf* fb = m_infoStream.rdbuf();
358  openStream(fileName, ".out", fb);
359 
360  info.write2File(fb);
361  report.write2File(fb);
362  }
363 
366  {
367  info.write2StdOut();
369  }
370 
373  {
374  if (m_infoStream.is_open()) m_infoStream.close();
375  }
376 
383  void CClient::openErrorStream(const StdString& fileName)
384  {
385  std::filebuf* fb = m_errorStream.rdbuf();
386  openStream(fileName, ".err", fb);
387 
388  error.write2File(fb);
389  }
390 
393  {
395  }
396 
399  {
400  if (m_errorStream.is_open()) m_errorStream.close();
401  }
402 }
size_t count(void)
Definition: buffer_out.cpp:54
CLog report("report")
Definition: log.hpp:56
std::ofstream StdOFStream
Definition: xios_spl.hpp:44
static StdOFStream m_infoStream
Definition: client.hpp:44
static void initialize(const string &codeId, MPI_Comm &localComm, MPI_Comm &returnComm)
Definition: client.cpp:37
CLog info("info")
Definition: log.hpp:55
std::stringstream StdStringStream
Definition: xios_spl.hpp:43
static void openErrorStream()
Write the error log to standard error output.
Definition: client.cpp:392
void initServer(MPI_Comm intraComm, MPI_Comm interComm, CContext *cxtClient=0)
Definition: context.cpp:385
void oasis_finalize(void)
double getCumulatedTime(void)
Definition: timer.cpp:49
static std::list< MPI_Comm > contextInterComms
Definition: client.hpp:19
void initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext *cxtServer=0)
Definition: context.cpp:267
static void registerContext(const string &id, MPI_Comm contextComm)
Definition: client.cpp:170
void suspend(void)
Definition: timer.cpp:23
static void closeErrorStream()
Close the error log file if it opens.
Definition: client.cpp:398
std::string StdString
Definition: xios_spl.hpp:48
#define xios(arg)
static bool usingServer
Using server (server mode)
Definition: cxios.hpp:46
void write2StdOut()
Write log into standard output.
Definition: log.hpp:37
static MPI_Comm globalComm
Global communicator.
Definition: cxios.hpp:42
static MPI_Comm & getInterComm()
Definition: client.cpp:26
static void setCurrent(const string &id)
Set context with an id be the current context.
Definition: context.cpp:2029
void oasis_get_intercomm(MPI_Comm &comm_client_server, const std::string &server_id)
static void setNotUsingServer()
Setting xios NOT to use server mode.
Definition: cxios.cpp:303
CATCH CScalarAlgorithmReduceScalar::CScalarAlgorithmReduceScalar(CScalar *scalarDestination, CScalar *scalarSource, CReduceScalarToScalar *algo ERROR)("CScalarAlgorithmReduceScalar::CScalarAlgorithmReduceScalar(CScalar* scalarDestination, CScalar* scalarSource, CReduceScalarToScalar* algo)",<< "Operation must be defined."<< "Scalar source "<< scalarSource->getId()<< std::endl<< "Scalar destination "<< scalarDestination->getId())
static void openStream(const StdString &fileName, const StdString &ext, std::filebuf *fb)
Open a file specified by a suffix and an extension and use it for the given file buffer.
Definition: client.cpp:321
static CContext * create(const string &id="")
Create a context with specific id.
Definition: context.cpp:2042
static int serverLeader
Definition: client.hpp:20
static string xiosCodeId
Identity for XIOS.
Definition: cxios.hpp:30
static void openInfoStream()
Write the info logs to standard output.
Definition: client.cpp:365
CLog error("error", cerr.rdbuf())
Definition: log.hpp:57
static int getRank()
Get global rank without oasis and current rank in model intraComm in case of oasis.
Definition: client.cpp:308
static bool isServer
Check if xios is server.
Definition: cxios.hpp:40
static bool usingOasis
Using Oasis.
Definition: cxios.hpp:45
static void finalize(void)
Definition: client.cpp:263
virtual size_t size(void) const
Definition: message.cpp:24
void oasis_init(const std::string &server_id)
void resume(void)
Definition: timer.cpp:33
static MPI_Comm interComm
Definition: client.hpp:18
void write2StdErr()
Write log into standard error output.
Definition: log.hpp:40
static void callOasisEnddef(void)
Send the order to the servers to call &quot;oasis_enddef&quot;.
Definition: client.cpp:238
static CTimer & get(std::string name)
Definition: timer.cpp:54
static void setUsingServer()
Setting xios to use server mode.
Definition: cxios.cpp:297
static MPI_Comm intraComm
Definition: client.hpp:17
static size_t maxRequestSize
static bool is_MPI_Initialized
Definition: client.hpp:21
void write2File(std::streambuf *sBuff)
Write log into a file with its streambuf.
Definition: log.hpp:34
static void closeInfoStream()
Close the info logs file if it opens.
Definition: client.cpp:372
static int rank_
Rank in model intraComm.
Definition: client.hpp:43
static StdOFStream m_errorStream
Definition: client.hpp:45
static std::string getAllCumulatedTime(void)
Definition: timer.cpp:62
const int INVALID_RANK
void oasis_get_localcomm(MPI_Comm &comm)