XIOS  1.0
Xml I/O Server
 Tout Classes Espaces de nommage Fichiers Fonctions Variables Définitions de type Énumérations Valeurs énumérées Amis Macros
context_client.cpp
Aller à la documentation de ce fichier.
1 #include "xios_spl.hpp"
2 #include "context_client.hpp"
3 #include "context_server.hpp"
4 #include "event_client.hpp"
5 #include "buffer_out.hpp"
6 #include "buffer_client.hpp"
7 #include "type.hpp"
8 #include "event_client.hpp"
9 #include "context.hpp"
10 #include "mpi.hpp"
11 #include "timer.hpp"
12 #include "cxios.hpp"
13 #include "server.hpp"
14 
15 namespace xios
16 {
23  CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
24  : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4)
25  {
26  context = parent;
27  intraComm = intraComm_;
28  interComm = interComm_;
29  MPI_Comm_rank(intraComm, &clientRank);
30  MPI_Comm_size(intraComm, &clientSize);
31 
32  int flag;
33  MPI_Comm_test_inter(interComm, &flag);
34  if (flag) MPI_Comm_remote_size(interComm, &serverSize);
35  else MPI_Comm_size(interComm, &serverSize);
36 
38 
39  timeLine = 0;
40  }
41 
42  void CContextClient::computeLeader(int clientRank, int clientSize, int serverSize,
43  std::list<int>& rankRecvLeader,
44  std::list<int>& rankRecvNotLeader)
45  {
46  if ((0 == clientSize) || (0 == serverSize)) return;
47 
48  if (clientSize < serverSize)
49  {
50  int serverByClient = serverSize / clientSize;
51  int remain = serverSize % clientSize;
52  int rankStart = serverByClient * clientRank;
53 
54  if (clientRank < remain)
55  {
56  serverByClient++;
57  rankStart += clientRank;
58  }
59  else
60  rankStart += remain;
61 
62  for (int i = 0; i < serverByClient; i++)
63  rankRecvLeader.push_back(rankStart + i);
64 
65  rankRecvNotLeader.resize(0);
66  }
67  else
68  {
69  int clientByServer = clientSize / serverSize;
70  int remain = clientSize % serverSize;
71 
72  if (clientRank < (clientByServer + 1) * remain)
73  {
74  if (clientRank % (clientByServer + 1) == 0)
75  rankRecvLeader.push_back(clientRank / (clientByServer + 1));
76  else
77  rankRecvNotLeader.push_back(clientRank / (clientByServer + 1));
78  }
79  else
80  {
81  int rank = clientRank - (clientByServer + 1) * remain;
82  if (rank % clientByServer == 0)
83  rankRecvLeader.push_back(remain + rank / clientByServer);
84  else
85  rankRecvNotLeader.push_back(remain + rank / clientByServer);
86  }
87  }
88  }
89 
95  {
96  list<int> ranks = event.getRanks();
97  info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<endl ;
99  {
100  int typeId, classId, typeId_in, classId_in, timeLine_out;
101  typeId_in=event.getTypeId() ;
102  classId_in=event.getClassId() ;
103 // MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
104  MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ;
105  MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
106  MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
107  if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
108  {
109  ERROR("void CContextClient::sendEvent(CEventClient& event)",
110  << "Event are not coherent between client.");
111  }
112  }
113 
114  if (!event.isEmpty())
115  {
116  list<int> sizes = event.getSizes();
117 
118  // We force the getBuffers call to be non-blocking on classical servers
119  list<CBufferOut*> buffList;
120  bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) ));
121 // bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer );
122 
123  if (couldBuffer)
124  {
125  event.send(timeLine, sizes, buffList);
126  info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<" sent"<<endl ;
127 
128  checkBuffers(ranks);
129 
130  if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
131  {
132  waitEvent(ranks);
134  }
135  }
136  else
137  {
138  tmpBufferedEvent.ranks = ranks;
139  tmpBufferedEvent.sizes = sizes;
140 
141  for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++)
142  tmpBufferedEvent.buffers.push_back(new CBufferOut(*it));
143  info(100)<<"DEBUG : temporaly event created : timeline "<<timeLine<<endl ;
144  event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers);
145  info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<" sent"<<endl ;
146  }
147  }
148 
149  timeLine++;
150  }
151 
158  {
159  bool couldSendTmpBufferedEvent = false;
160 
162  {
163  list<CBufferOut*> buffList;
164  if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call
165  {
166  list<CBufferOut*>::iterator it, itBuffer;
167 
168  for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++)
169  (*itBuffer)->put((char*)(*it)->start(), (*it)->count());
170 
171  info(100)<<"DEBUG : temporaly event sent "<<endl ;
173 
174  tmpBufferedEvent.clear();
175 
176  couldSendTmpBufferedEvent = true;
177  }
178  }
179 
180  return couldSendTmpBufferedEvent;
181  }
182 
188  void CContextClient::waitEvent(list<int>& ranks)
189  {
191  while (checkBuffers(ranks))
192  {
195  }
196 
198  {
200  }
201  }
202 
213  bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers,
214  bool nonBlocking /*= false*/)
215  {
216  list<int>::const_iterator itServer, itSize;
217  list<CClientBuffer*> bufferList;
218  map<int,CClientBuffer*>::const_iterator it;
219  list<CClientBuffer*>::iterator itBuffer;
220  bool areBuffersFree;
221 
222  for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
223  {
224  it = buffers.find(*itServer);
225  if (it == buffers.end())
226  {
227  newBuffer(*itServer);
228  it = buffers.find(*itServer);
229  }
230  bufferList.push_back(it->second);
231  }
232 
233  CTimer::get("Blocking time").resume();
234  do
235  {
236  areBuffersFree = true;
237  for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
238  areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
239 
240  if (!areBuffersFree)
241  {
242  checkBuffers();
243  if (CServer::serverLevel == 0)
244  context->server->listen();
245 
246  else if (CServer::serverLevel == 1)
247  {
248  context->server->listen();
249  for (int i = 0; i < context->serverPrimServer.size(); ++i)
250  context->serverPrimServer[i]->listen();
251  CServer::contextEventLoop(false) ; // avoid dead-lock at finalize...
252  }
253 
254  else if (CServer::serverLevel == 2)
255  context->server->listen();
256 
257  }
258  } while (!areBuffersFree && !nonBlocking);
259 
260  CTimer::get("Blocking time").suspend();
261 
262  if (areBuffersFree)
263  {
264  for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
265  retBuffers.push_back((*itBuffer)->getBuffer(*itSize));
266  }
267 
268  return areBuffersFree;
269  }
270 
276  {
277  if (!mapBufferSize_.count(rank))
278  {
279  error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
282  }
283  CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank], maxBufferedEvents);
284  // Notify the server
285  CBufferOut* bufOut = buffer->getBuffer(sizeof(StdSize));
286  bufOut->put(mapBufferSize_[rank]); // Stupid C++
287  buffer->checkBuffer();
288  }
289 
295  {
296  map<int,CClientBuffer*>::iterator itBuff;
297  bool pending = false;
298  for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
299  pending |= itBuff->second->checkBuffer();
300  return pending;
301  }
302 
305  {
306  map<int,CClientBuffer*>::iterator itBuff;
307  for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
308  {
309  delete itBuff->second;
310  }
311  buffers.clear();
312  }
313 
319  bool CContextClient::checkBuffers(list<int>& ranks)
320  {
321  list<int>::iterator it;
322  bool pending = false;
323  for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer();
324  return pending;
325  }
326 
333  void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)
334  {
335  mapBufferSize_ = mapSize;
336  maxEventSizes = maxEventSize;
337 
338  // Compute the maximum number of events that can be safely buffered.
339  double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max();
340  for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it)
341  {
342  double ratio = double(it->second) / maxEventSizes[it->first];
343  if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio;
344  }
345  MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm);
346 
347  if (minBufferSizeEventSizeRatio < 1.0)
348  {
349  ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)",
350  << "The buffer sizes and the maximum events sizes are incoherent.");
351  }
352  else if (minBufferSizeEventSizeRatio == std::numeric_limits<double>::max())
353  minBufferSizeEventSizeRatio = 1.0; // In this case, maxBufferedEvents will never be used but we want to avoid any floating point exception
354 
355  maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server
356  + size_t(minBufferSizeEventSizeRatio) // one local buffer can always be fully used
357  + 1; // the other local buffer might contain only one event
358  }
359 
364  const std::list<int>& CContextClient::getRanksServerNotLeader(void) const
365  {
366  return ranksServerNotLeader;
367  }
368 
374  {
375  return !ranksServerNotLeader.empty();
376  }
377 
382  const std::list<int>& CContextClient::getRanksServerLeader(void) const
383  {
384  return ranksServerLeader;
385  }
386 
392  {
393  return !ranksServerLeader.empty();
394  }
395 
402  {
403  return (parentServer != 0);
404  }
405 
410  {
411  map<int,CClientBuffer*>::iterator itBuff;
412  bool stop = false;
413 
414  CTimer::get("Blocking time").resume();
416  {
417  checkBuffers();
419  }
420  CTimer::get("Blocking time").suspend();
421 
423  if (isServerLeader())
424  {
425  CMessage msg;
426  const std::list<int>& ranks = getRanksServerLeader();
427  for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
428  {
429  info(100)<<"DEBUG : Sent context Finalize event to rank "<<*itRank<<endl ;
430  event.push(*itRank, 1, msg);
431  }
432  sendEvent(event);
433  }
434  else sendEvent(event);
435 
436  CTimer::get("Blocking time").resume();
437 // while (!stop)
438  {
439  checkBuffers();
442 
443  stop = true;
444 // for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest();
445  }
446  CTimer::get("Blocking time").suspend();
447 
448  std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
449  iteMap = mapBufferSize_.end(), itMap;
450 
451  StdSize totalBuf = 0;
452  for (itMap = itbMap; itMap != iteMap; ++itMap)
453  {
454  report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl
455  << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
456  totalBuf += itMap->second;
457  }
458  report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
459 
460  //releaseBuffers(); // moved to CContext::finalize()
461  }
462 
463 
467  {
468  bool pending = false;
469  map<int,CClientBuffer*>::iterator itBuff;
470  for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
471  pending |= itBuff->second->hasPendingRequest();
472  return pending;
473  }
474 
475 
476 }
CContextClient(CContext *parent, MPI_Comm intraComm, MPI_Comm interComm, CContext *parentServer=0)
CLog report("report")
Definition: log.hpp:56
static bool isClient
Check if xios is client.
Definition: cxios.hpp:39
bool put(const T &data)
static void computeLeader(int clientRank, int clientSize, int serverSize, std::list< int > &rankRecvLeader, std::list< int > &rankRecvNotLeader)
void sendEvent(CEventClient &event)
In case of attached mode, the current context must be reset to context for client.
MPI_Comm intraComm
Communicator of client group.
static bool checkEventSync
For debuuging, check if event are coherent and synchrone on client side.
Definition: cxios.hpp:57
CLog info("info")
Definition: log.hpp:55
void releaseBuffers(void)
Release all buffers.
bool eventLoop(bool enableEventsProcessing=true)
int clientSize
Size of client group.
std::list< int > ranksServerNotLeader
List of server ranks for which the client is not leader.
CContext * parentServer
Event temporarily buffered (used only on the server)
CContextServer * server
Concrete context server.
Definition: context.hpp:250
void checkPendingRequest(void)
void setBufferSize(const std::map< int, StdSize > &mapSize, const std::map< int, StdSize > &maxEventSize)
Set the buffer size for each connection.
static ENodeType GetType(void)
Definition: context.cpp:59
const std::list< int > & getRanksServerNotLeader(void) const
Get leading server in the group of connected server.
size_t timeLine
Timeline of each event.
std::map< int, StdSize > mapBufferSize_
Mapping of server and buffer size for each connection to server.
bool checkBuffers(void)
Verify state of buffers.
void suspend(void)
Definition: timer.cpp:23
static int serverLevel
Definition: server.hpp:35
bool isAttachedModeEnabled() const
Check if the attached mode is used.
const std::list< int > & getRanksServerLeader(void) const
Get leading server in the group of connected server.
#define xios(arg)
const StdString & getId(void) const
Accesseurs ///.
Definition: object.cpp:26
map< int, CClientBuffer * > buffers
Buffers for connection to servers.
bool getBuffers(const list< int > &serverList, const list< int > &sizeList, list< CBufferOut * > &retBuffers, bool nonBlocking=false)
Get buffers for each connection to the servers.
std::list< int > sizes
static void setCurrent(const string &id)
Set context with an id be the current context.
Definition: context.cpp:2029
void finalize(void)
Finalize context client and do some reports.
CBufferOut * getBuffer(StdSize size)
std::list< int > ranksServerLeader
List of server ranks for which the client is leader.
CATCH CScalarAlgorithmReduceScalar::CScalarAlgorithmReduceScalar(CScalar *scalarDestination, CScalar *scalarSource, CReduceScalarToScalar *algo ERROR)("CScalarAlgorithmReduceScalar::CScalarAlgorithmReduceScalar(CScalar* scalarDestination, CScalar* scalarSource, CReduceScalarToScalar* algo)",<< "Operation must be defined."<< "Scalar source "<< scalarSource->getId()<< std::endl<< "Scalar destination "<< scalarDestination->getId())
static StdSize minBufferSize
Minimum buffer size.
Definition: cxios.hpp:52
StdSize maxBufferedEvents
Maximum number of events that can be buffered.
std::size_t StdSize
Definition: xios_spl.hpp:49
int clientRank
Rank of current client.
bool sendTemporarilyBufferedEvent()
Send the temporarily buffered event (if any).
std::map< int, StdSize > maxEventSizes
Maximum event sizes estimated for each connection to server.
CLog error("error", cerr.rdbuf())
Definition: log.hpp:57
std::vector< CContextServer * > serverPrimServer
Definition: context.hpp:252
MPI_Comm interComm
Communicator of server group.
int serverSize
Size of server group.
bool hasTemporarilyBufferedEvent() const
bool isServerNotLeader(void) const
Check if client connects to leading server.
std::list< int > ranks
static void contextEventLoop(bool enableEventsProcessing=true)
Definition: server.cpp:842
bool havePendingRequests(void)
CContext * context
Context for client.
void resume(void)
Definition: timer.cpp:33
static CTimer & get(std::string name)
Definition: timer.cpp:54
struct xios::CContextClient::@0 tmpBufferedEvent
void newBuffer(int rank)
Make a new buffer for a certain connection to server with specific rank.
void waitEvent(list< int > &ranks)
If client is also server (attached mode), after sending event, it should process right away the incom...
bool isServerLeader(void) const
Check if client connects to leading server.