source: XIOS/trunk/src/context_client.cpp @ 934

Last change on this file since 934 was 917, checked in by rlacroix, 8 years ago

Fix the client/server communication protocol.

In the some extreme cases a deadlock could occur. To fix this, the number of buffered events must be properly limited.

If you noticed decreased performance due to this commit, please let us know about it.

Fixes ticket #91.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 10.4 KB
Line 
1#include "xios_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13
14namespace xios
15{
16    /*!
17    \param [in] parent Pointer to context on client side
18    \param [in] intraComm_ communicator of group client
19    \param [in] interComm_ communicator of group server
20    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used on case of attached mode)
21    */
22    CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
23     : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4)
24    {
25      context = parent;
26      intraComm = intraComm_;
27      interComm = interComm_;
28      MPI_Comm_rank(intraComm, &clientRank);
29      MPI_Comm_size(intraComm, &clientSize);
30
31      int flag;
32      MPI_Comm_test_inter(interComm, &flag);
33      if (flag) MPI_Comm_remote_size(interComm, &serverSize);
34      else  MPI_Comm_size(interComm, &serverSize);
35
36      if (clientSize < serverSize)
37      {
38        int serverByClient = serverSize / clientSize;
39        int remain = serverSize % clientSize;
40        int rankStart = serverByClient * clientRank;
41
42        if (clientRank < remain)
43        {
44          serverByClient++;
45          rankStart += clientRank;
46        }
47        else
48          rankStart += remain;
49
50        for (int i = 0; i < serverByClient; i++)
51          ranksServerLeader.push_back(rankStart + i);
52      }
53      else
54      {
55        int clientByServer = clientSize / serverSize;
56        int remain = clientSize % serverSize;
57
58        if (clientRank < (clientByServer + 1) * remain)
59        {
60          if (clientRank % (clientByServer + 1) == 0)
61            ranksServerLeader.push_back(clientRank / (clientByServer + 1));
62        }
63        else
64        {
65          int rank = clientRank - (clientByServer + 1) * remain;
66          if (rank % clientByServer == 0)
67            ranksServerLeader.push_back(remain + rank / clientByServer);
68        }
69      }
70
71      timeLine = 0;
72    }
73
74    /*!
75    In case of attached mode, the current context must be reset to context for client
76    \param [in] event Event sent to server
77    */
78    void CContextClient::sendEvent(CEventClient& event)
79    {
80      list<int> ranks = event.getRanks();
81      if (!event.isEmpty())
82      {
83        list<int> sizes = event.getSizes();
84
85        list<CBufferOut*> buffList = getBuffers(ranks, sizes);
86
87        event.send(timeLine, sizes, buffList);
88
89        checkBuffers(ranks);
90      }
91
92      if (isAttachedModeEnabled())
93      {
94        waitEvent(ranks);
95        CContext::setCurrent(context->getId());
96      }
97
98      timeLine++;
99    }
100
101    /*!
102    If client is also server (attached mode), after sending event, it should process right away
103    the incoming event.
104    \param [in] ranks list rank of server connected this client
105    */
106    void CContextClient::waitEvent(list<int>& ranks)
107    {
108      parentServer->server->setPendingEvent();
109      while (checkBuffers(ranks))
110      {
111        parentServer->server->listen();
112        parentServer->server->checkPendingRequest();
113      }
114
115      while (parentServer->server->hasPendingEvent())
116      {
117       parentServer->server->eventLoop();
118      }
119    }
120
121    /*!
122    Setup buffer for each connection to server and verify their state to put content into them
123    \param [in] serverList list of rank of connected server
124    \param [in] sizeList size of message corresponding to each connection
125    \return List of buffer input which event can be placed
126    */
127    list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList)
128    {
129      list<int>::iterator itServer, itSize;
130      list<CClientBuffer*> bufferList;
131      map<int,CClientBuffer*>::iterator it;
132      list<CClientBuffer*>::iterator itBuffer;
133      list<CBufferOut*>  retBuffer;
134      bool areBuffersFree;
135
136      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
137      {
138        it = buffers.find(*itServer);
139        if (it == buffers.end())
140        {
141          newBuffer(*itServer);
142          it = buffers.find(*itServer);
143        }
144        bufferList.push_back(it->second);
145      }
146
147      CTimer::get("Blocking time").resume();
148      do
149      {
150        areBuffersFree = true;
151        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
152          areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
153
154        if (!areBuffersFree)
155        {
156          checkBuffers();
157          context->server->listen();
158        }
159      } while (!areBuffersFree);
160      CTimer::get("Blocking time").suspend();
161
162      for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
163      {
164        retBuffer.push_back((*itBuffer)->getBuffer(*itSize));
165      }
166      return retBuffer;
167   }
168
169   /*!
170   Make a new buffer for a certain connection to server with specific rank
171   \param [in] rank rank of connected server
172   */
173   void CContextClient::newBuffer(int rank)
174   {
175      if (!mapBufferSize_.count(rank))
176      {
177        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
178        mapBufferSize_[rank] = CXios::minBufferSize;
179      }
180      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxBufferedEvents);
181      // Notify the server
182      CBufferOut* bufOut = buffer->getBuffer(sizeof(StdSize));
183      bufOut->put(mapBufferSize_[rank]); // Stupid C++
184      buffer->checkBuffer();
185   }
186
187   /*!
188   Verify state of buffers. Buffer is under pending state if there is no message on it
189   \return state of buffers, pending(true), ready(false)
190   */
191   bool CContextClient::checkBuffers(void)
192   {
193      map<int,CClientBuffer*>::iterator itBuff;
194      bool pending = false;
195      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) pending |= itBuff->second->checkBuffer();
196      return pending;
197   }
198
199   //! Release all buffers
200   void CContextClient::releaseBuffers(void)
201   {
202      map<int,CClientBuffer*>::iterator itBuff;
203      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) delete itBuff->second;
204   }
205
206   /*!
207   Verify state of buffers corresponding to a connection
208   \param [in] ranks list rank of server to which client connects to
209   \return state of buffers, pending(true), ready(false)
210   */
211   bool CContextClient::checkBuffers(list<int>& ranks)
212   {
213      list<int>::iterator it;
214      bool pending = false;
215      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer();
216      return pending;
217   }
218
219   /*!
220    * Set the buffer size for each connection. Warning: This function is collective.
221    *
222    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
223    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
224   */
225   void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)
226   {
227     mapBufferSize_ = mapSize;
228
229     // Compute the maximum number of events that can be safely buffered.
230     double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max();
231     for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it)
232     {
233       double ratio = double(it->second) / maxEventSize.at(it->first);
234       if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio;
235     }
236     MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm);
237
238     if (minBufferSizeEventSizeRatio < 1.0)
239       ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)",
240             << "The buffer sizes and the maximum events sizes are incoherent.");
241
242     maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server
243                          + size_t(minBufferSizeEventSizeRatio)  // one local buffer can always be fully used
244                          + 1;                                   // the other local buffer might contain only one event
245   }
246
247  /*!
248  Get leading server in the group of connected server
249  \return ranks of leading servers
250  */
251  const std::list<int>& CContextClient::getRanksServerLeader(void) const
252  {
253    return ranksServerLeader;
254  }
255
256  /*!
257  Check if client connects to leading server
258  \return connected(true), not connected (false)
259  */
260  bool CContextClient::isServerLeader(void) const
261  {
262    return !ranksServerLeader.empty();
263  }
264
265  /*!
266   * Check if the attached mode is used.
267   *
268   * \return true if and only if attached mode is used
269   */
270  bool CContextClient::isAttachedModeEnabled() const
271  {
272    return (parentServer != 0);
273  }
274
275   /*!
276   Finalize context client and do some reports
277   */
278   void CContextClient::finalize(void)
279   {
280     map<int,CClientBuffer*>::iterator itBuff;
281     bool stop = true;
282
283     CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
284     if (isServerLeader())
285     {
286       CMessage msg;
287       const std::list<int>& ranks = getRanksServerLeader();
288       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
289         event.push(*itRank, 1, msg);
290       sendEvent(event);
291     }
292     else sendEvent(event);
293
294     CTimer::get("Blocking time").resume();
295     while (stop)
296     {
297       checkBuffers();
298       stop = false;
299       for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop |= itBuff->second->hasPendingRequest();
300     }
301     CTimer::get("Blocking time").suspend();
302
303     std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
304                                           iteMap = mapBufferSize_.end(), itMap;
305     StdSize totalBuf = 0;
306     for (itMap = itbMap; itMap != iteMap; ++itMap)
307     {
308       report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl
309                  << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
310       totalBuf += itMap->second;
311     }
312     report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
313
314     releaseBuffers();
315   }
316}
Note: See TracBrowser for help on using the repository browser.