source: XIOS/dev/dev_olga/src/context_client.cpp @ 1193

Last change on this file since 1193 was 1193, checked in by oabramkina, 7 years ago

Two server levels: fixing a bug during context finalization. The last buffer check for the connection from server (classical or primary) to client must be blocking. Otherwise it's possible to have lost messages sent by server to client causing a deadlock on the side of client at context finalization.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 15.2 KB
Line 
1#include "xios_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13#include "server.hpp"
14
15namespace xios
16{
17    /*!
18    \param [in] parent Pointer to context on client side
19    \param [in] intraComm_ communicator of group client
20    \param [in] interComm_ communicator of group server
21    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
22    */
23    CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
24     : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4)
25    {
26      context = parent;
27      intraComm = intraComm_;
28      interComm = interComm_;
29      MPI_Comm_rank(intraComm, &clientRank);
30      MPI_Comm_size(intraComm, &clientSize);
31
32      int flag;
33      MPI_Comm_test_inter(interComm, &flag);
34      if (flag) MPI_Comm_remote_size(interComm, &serverSize);
35      else  MPI_Comm_size(interComm, &serverSize);
36
37      if (clientSize < serverSize)
38      {
39        int serverByClient = serverSize / clientSize;
40        int remain = serverSize % clientSize;
41        int rankStart = serverByClient * clientRank;
42
43        if (clientRank < remain)
44        {
45          serverByClient++;
46          rankStart += clientRank;
47        }
48        else
49          rankStart += remain;
50
51        for (int i = 0; i < serverByClient; i++)
52          ranksServerLeader.push_back(rankStart + i);
53
54        ranksServerNotLeader.resize(0);
55      }
56      else
57      {
58        int clientByServer = clientSize / serverSize;
59        int remain = clientSize % serverSize;
60
61        if (clientRank < (clientByServer + 1) * remain)
62        {
63          if (clientRank % (clientByServer + 1) == 0)
64            ranksServerLeader.push_back(clientRank / (clientByServer + 1));
65          else
66            ranksServerNotLeader.push_back(clientRank / (clientByServer + 1));
67        }
68        else
69        {
70          int rank = clientRank - (clientByServer + 1) * remain;
71          if (rank % clientByServer == 0)
72            ranksServerLeader.push_back(remain + rank / clientByServer);
73          else
74            ranksServerNotLeader.push_back(remain + rank / clientByServer);
75        }
76      }
77
78      timeLine = 0;
79    }
80
81    /*!
82    In case of attached mode, the current context must be reset to context for client
83    \param [in] event Event sent to server
84    */
85    void CContextClient::sendEvent(CEventClient& event)
86    {
87      list<int> ranks = event.getRanks();
88
89      if (!event.isEmpty())
90      {
91        list<int> sizes = event.getSizes();
92
93        // We force the getBuffers call to be non-blocking on classical servers
94        list<CBufferOut*> buffList;
95        bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) ));
96//        bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer );
97
98        if (couldBuffer)
99        {
100          event.send(timeLine, sizes, buffList);
101
102          checkBuffers(ranks);
103
104          if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
105          {
106            waitEvent(ranks);
107            CContext::setCurrent(context->getId());
108          }
109        }
110        else
111        {
112          tmpBufferedEvent.ranks = ranks;
113          tmpBufferedEvent.sizes = sizes;
114
115          for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++)
116            tmpBufferedEvent.buffers.push_back(new CBufferOut(*it));
117
118          event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers);
119        }
120      }
121
122      timeLine++;
123    }
124
125    /*!
126     * Send the temporarily buffered event (if any).
127     *
128     * \return true if a temporarily buffered event could be sent, false otherwise
129     */
130    bool CContextClient::sendTemporarilyBufferedEvent()
131    {
132      bool couldSendTmpBufferedEvent = false;
133
134      if (hasTemporarilyBufferedEvent())
135      {
136        list<CBufferOut*> buffList;
137        if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call
138        {
139          list<CBufferOut*>::iterator it, itBuffer;
140
141          for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++)
142            (*itBuffer)->put((char*)(*it)->start(), (*it)->count());
143
144          checkBuffers(tmpBufferedEvent.ranks);
145
146          tmpBufferedEvent.clear();
147
148          couldSendTmpBufferedEvent = true;
149        }
150      }
151
152      return couldSendTmpBufferedEvent;
153    }
154
155    /*!
156    If client is also server (attached mode), after sending event, it should process right away
157    the incoming event.
158    \param [in] ranks list rank of server connected this client
159    */
160    void CContextClient::waitEvent(list<int>& ranks)
161    {
162      parentServer->server->setPendingEvent();
163      while (checkBuffers(ranks))
164      {
165        parentServer->server->listen();
166        parentServer->server->checkPendingRequest();
167      }
168
169      while (parentServer->server->hasPendingEvent())
170      {
171       parentServer->server->eventLoop();
172      }
173    }
174
175    /*!
176     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
177     * it is explicitly requested to be non-blocking.
178     *
179     * \param [in] serverList list of rank of connected server
180     * \param [in] sizeList size of message corresponding to each connection
181     * \param [out] retBuffers list of buffers that can be used to store an event
182     * \param [in] nonBlocking whether this function should be non-blocking
183     * \return whether the already allocated buffers could be used
184    */
185    bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers,
186                                    bool nonBlocking /*= false*/)
187    {
188      list<int>::const_iterator itServer, itSize;
189      list<CClientBuffer*> bufferList;
190      map<int,CClientBuffer*>::const_iterator it;
191      list<CClientBuffer*>::iterator itBuffer;
192      bool areBuffersFree;
193
194      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
195      {
196        it = buffers.find(*itServer);
197        if (it == buffers.end())
198        {
199          newBuffer(*itServer);
200          it = buffers.find(*itServer);
201        }
202        bufferList.push_back(it->second);
203      }
204
205      CTimer::get("Blocking time").resume();
206      do
207      {
208        areBuffersFree = true;
209        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
210          areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
211
212        if (!areBuffersFree)
213        {
214          checkBuffers();
215          if (CServer::serverLevel == 0)
216            context->server->listen();
217
218          else if (CServer::serverLevel == 1)
219          {
220            context->server->listen();
221            for (int i = 0; i < context->serverPrimServer.size(); ++i)
222              context->serverPrimServer[i]->listen();
223          }
224
225          else if (CServer::serverLevel == 2)
226            context->server->listen();
227
228        }
229      } while (!areBuffersFree && !nonBlocking);
230
231      CTimer::get("Blocking time").suspend();
232
233      if (areBuffersFree)
234      {
235        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
236          retBuffers.push_back((*itBuffer)->getBuffer(*itSize));
237      }
238
239      return areBuffersFree;
240   }
241
242   /*!
243   Make a new buffer for a certain connection to server with specific rank
244   \param [in] rank rank of connected server
245   */
246   void CContextClient::newBuffer(int rank)
247   {
248     if (!mapBufferSize_.count(rank))
249     {
250       error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
251       mapBufferSize_[rank] = CXios::minBufferSize;
252     }
253     CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxBufferedEvents);
254     // Notify the server
255     CBufferOut* bufOut = buffer->getBuffer(sizeof(StdSize));
256     bufOut->put(mapBufferSize_[rank]); // Stupid C++
257     buffer->checkBuffer();
258   }
259
260   /*!
261   Verify state of buffers. Buffer is under pending state if there is no message on it
262   \return state of buffers, pending(true), ready(false)
263   */
264   bool CContextClient::checkBuffers(void)
265   {
266      map<int,CClientBuffer*>::iterator itBuff;
267      bool pending = false;
268      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
269        pending |= itBuff->second->checkBuffer();
270      return pending;
271   }
272
273   //! Release all buffers
274   void CContextClient::releaseBuffers()
275   {
276      map<int,CClientBuffer*>::iterator itBuff;
277      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
278      {
279          delete itBuff->second;
280      }
281      buffers.clear();
282   }
283
284   /*!
285   Verify state of buffers corresponding to a connection
286   \param [in] ranks list rank of server to which client connects to
287   \return state of buffers, pending(true), ready(false)
288   */
289   bool CContextClient::checkBuffers(list<int>& ranks)
290   {
291      list<int>::iterator it;
292      bool pending = false;
293      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer();
294      return pending;
295   }
296
297   /*!
298    * Set the buffer size for each connection. Warning: This function is collective.
299    *
300    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
301    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
302   */
303   void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)
304   {
305     mapBufferSize_ = mapSize;
306
307     // Compute the maximum number of events that can be safely buffered.
308     double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max();
309     for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it)
310     {
311       double ratio = double(it->second) / maxEventSize.at(it->first);
312       if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio;
313     }
314     MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm);
315
316     if (minBufferSizeEventSizeRatio < 1.0)
317       ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)",
318             << "The buffer sizes and the maximum events sizes are incoherent.");
319
320     maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server
321                          + size_t(minBufferSizeEventSizeRatio)  // one local buffer can always be fully used
322                          + 1;                                   // the other local buffer might contain only one event
323   }
324
325  /*!
326  Get leading server in the group of connected server
327  \return ranks of leading servers
328  */
329  const std::list<int>& CContextClient::getRanksServerNotLeader(void) const
330  {
331    return ranksServerNotLeader;
332  }
333
334  /*!
335  Check if client connects to leading server
336  \return connected(true), not connected (false)
337  */
338  bool CContextClient::isServerNotLeader(void) const
339  {
340    return !ranksServerNotLeader.empty();
341  }
342
343  /*!
344  Get leading server in the group of connected server
345  \return ranks of leading servers
346  */
347  const std::list<int>& CContextClient::getRanksServerLeader(void) const
348  {
349    return ranksServerLeader;
350  }
351
352  /*!
353  Check if client connects to leading server
354  \return connected(true), not connected (false)
355  */
356  bool CContextClient::isServerLeader(void) const
357  {
358    return !ranksServerLeader.empty();
359  }
360
361  /*!
362   * Check if the attached mode is used.
363   *
364   * \return true if and only if attached mode is used
365   */
366  bool CContextClient::isAttachedModeEnabled() const
367  {
368    return (parentServer != 0);
369  }
370
371   /*!
372   * Finalize context client and do some reports. Function is non-blocking.
373   */
374  void CContextClient::finalize(void)
375  {
376    map<int,CClientBuffer*>::iterator itBuff;
377    bool stop = false;
378
379    CTimer::get("Blocking time").resume();
380    while (hasTemporarilyBufferedEvent())
381    {
382      checkBuffers();
383      sendTemporarilyBufferedEvent();
384    }
385    CTimer::get("Blocking time").suspend();
386
387    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
388    if (isServerLeader())
389    {
390      CMessage msg;
391      const std::list<int>& ranks = getRanksServerLeader();
392      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
393        event.push(*itRank, 1, msg);
394      sendEvent(event);
395    }
396    else sendEvent(event);
397
398    CTimer::get("Blocking time").resume();
399//    while (!stop)
400    {
401      checkBuffers();
402      if (hasTemporarilyBufferedEvent())
403        sendTemporarilyBufferedEvent();
404
405      stop = true;
406//      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest();
407    }
408    CTimer::get("Blocking time").suspend();
409
410    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
411                                          iteMap = mapBufferSize_.end(), itMap;
412
413    StdSize totalBuf = 0;
414    for (itMap = itbMap; itMap != iteMap; ++itMap)
415    {
416      report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl
417                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
418      totalBuf += itMap->second;
419    }
420    report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
421
422    //releaseBuffers(); // moved to CContext::finalize()
423  }
424
425  /*!
426  * Finalize context client and do some reports. Function is non-blocking.
427  */
428 void CContextClient::postFinalize(void)
429 {
430   map<int,CClientBuffer*>::iterator itBuff;
431   bool stop = false;
432
433   CTimer::get("Blocking time").resume();
434   while (hasTemporarilyBufferedEvent())
435   {
436     checkBuffers();
437     sendTemporarilyBufferedEvent();
438   }
439   CTimer::get("Blocking time").suspend();
440
441   CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_POST_FINALIZE);
442   if (isServerLeader())
443   {
444     CMessage msg;
445     const std::list<int>& ranks = getRanksServerLeader();
446     for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
447       event.push(*itRank, 1, msg);
448     sendEvent(event);
449   }
450   else sendEvent(event);
451
452   CTimer::get("Blocking time").resume();
453//   while (!stop)
454   {
455     checkBuffers();
456     if (hasTemporarilyBufferedEvent())
457       sendTemporarilyBufferedEvent();
458     stop = true;
459//     for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest();
460   }
461   CTimer::get("Blocking time").suspend();
462
463 }
464
465  /*!
466  */
467  bool CContextClient::havePendingRequests(void)
468  {
469    bool pending = false;
470    map<int,CClientBuffer*>::iterator itBuff;
471    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
472      pending |= itBuff->second->hasPendingRequest();
473    return pending;
474  }
475
476
477}
Note: See TracBrowser for help on using the repository browser.