source: XIOS/dev/XIOS_DEV_CMIP6/src/context_client.cpp @ 1314

Last change on this file since 1314 was 1232, checked in by mhnguyen, 7 years ago

Fixing the blocking problem where there are more servers than the number of grid band distribution

+) Correct this problem not only for writing but also for reading
+) Allow "zero-size" domain, axis (i.e: domain, axis with ni = 0, and/or nj=0)

Test
+) On Curie
+) Work in both cases: Read and Write data

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 14.8 KB
Line 
1#include "xios_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13#include "server.hpp"
14
15namespace xios
16{
17    /*!
18    \param [in] parent Pointer to context on client side
19    \param [in] intraComm_ communicator of group client
20    \param [in] interComm_ communicator of group server
21    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
22    */
23    CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
24     : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4)
25    {
26      context = parent;
27      intraComm = intraComm_;
28      interComm = interComm_;
29      MPI_Comm_rank(intraComm, &clientRank);
30      MPI_Comm_size(intraComm, &clientSize);
31
32      int flag;
33      MPI_Comm_test_inter(interComm, &flag);
34      if (flag) MPI_Comm_remote_size(interComm, &serverSize);
35      else  MPI_Comm_size(interComm, &serverSize);
36
37      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader);
38
39      timeLine = 0;
40    }
41
42    void CContextClient::computeLeader(int clientRank, int clientSize, int serverSize,
43                                       std::list<int>& rankRecvLeader,
44                                       std::list<int>& rankRecvNotLeader)
45    {
46      if ((0 == clientSize) || (0 == serverSize)) return;
47
48      if (clientSize < serverSize)
49      {
50        int serverByClient = serverSize / clientSize;
51        int remain = serverSize % clientSize;
52        int rankStart = serverByClient * clientRank;
53
54        if (clientRank < remain)
55        {
56          serverByClient++;
57          rankStart += clientRank;
58        }
59        else
60          rankStart += remain;
61
62        for (int i = 0; i < serverByClient; i++)
63          rankRecvLeader.push_back(rankStart + i);
64
65        rankRecvNotLeader.resize(0);
66      }
67      else
68      {
69        int clientByServer = clientSize / serverSize;
70        int remain = clientSize % serverSize;
71
72        if (clientRank < (clientByServer + 1) * remain)
73        {
74          if (clientRank % (clientByServer + 1) == 0)
75            rankRecvLeader.push_back(clientRank / (clientByServer + 1));
76          else
77            rankRecvNotLeader.push_back(clientRank / (clientByServer + 1));
78        }
79        else
80        {
81          int rank = clientRank - (clientByServer + 1) * remain;
82          if (rank % clientByServer == 0)
83            rankRecvLeader.push_back(remain + rank / clientByServer);
84          else
85            rankRecvNotLeader.push_back(remain + rank / clientByServer);
86        }
87      }
88    }
89
90    /*!
91    In case of attached mode, the current context must be reset to context for client
92    \param [in] event Event sent to server
93    */
94    void CContextClient::sendEvent(CEventClient& event)
95    {
96      list<int> ranks = event.getRanks();
97
98      if (!event.isEmpty())
99      {
100        list<int> sizes = event.getSizes();
101
102        // We force the getBuffers call to be non-blocking on classical servers
103        list<CBufferOut*> buffList;
104        bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) ));
105//        bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer );
106
107        if (couldBuffer)
108        {
109          event.send(timeLine, sizes, buffList);
110
111          checkBuffers(ranks);
112
113          if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
114          {
115            waitEvent(ranks);
116            CContext::setCurrent(context->getId());
117          }
118        }
119        else
120        {
121          tmpBufferedEvent.ranks = ranks;
122          tmpBufferedEvent.sizes = sizes;
123
124          for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++)
125            tmpBufferedEvent.buffers.push_back(new CBufferOut(*it));
126
127          event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers);
128        }
129      }
130
131      timeLine++;
132    }
133
134    /*!
135     * Send the temporarily buffered event (if any).
136     *
137     * \return true if a temporarily buffered event could be sent, false otherwise
138     */
139    bool CContextClient::sendTemporarilyBufferedEvent()
140    {
141      bool couldSendTmpBufferedEvent = false;
142
143      if (hasTemporarilyBufferedEvent())
144      {
145        list<CBufferOut*> buffList;
146        if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call
147        {
148          list<CBufferOut*>::iterator it, itBuffer;
149
150          for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++)
151            (*itBuffer)->put((char*)(*it)->start(), (*it)->count());
152
153          checkBuffers(tmpBufferedEvent.ranks);
154
155          tmpBufferedEvent.clear();
156
157          couldSendTmpBufferedEvent = true;
158        }
159      }
160
161      return couldSendTmpBufferedEvent;
162    }
163
164    /*!
165    If client is also server (attached mode), after sending event, it should process right away
166    the incoming event.
167    \param [in] ranks list rank of server connected this client
168    */
169    void CContextClient::waitEvent(list<int>& ranks)
170    {
171      parentServer->server->setPendingEvent();
172      while (checkBuffers(ranks))
173      {
174        parentServer->server->listen();
175        parentServer->server->checkPendingRequest();
176      }
177
178      while (parentServer->server->hasPendingEvent())
179      {
180       parentServer->server->eventLoop();
181      }
182    }
183
184    /*!
185     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
186     * it is explicitly requested to be non-blocking.
187     *
188     * \param [in] serverList list of rank of connected server
189     * \param [in] sizeList size of message corresponding to each connection
190     * \param [out] retBuffers list of buffers that can be used to store an event
191     * \param [in] nonBlocking whether this function should be non-blocking
192     * \return whether the already allocated buffers could be used
193    */
194    bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers,
195                                    bool nonBlocking /*= false*/)
196    {
197      list<int>::const_iterator itServer, itSize;
198      list<CClientBuffer*> bufferList;
199      map<int,CClientBuffer*>::const_iterator it;
200      list<CClientBuffer*>::iterator itBuffer;
201      bool areBuffersFree;
202
203      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
204      {
205        it = buffers.find(*itServer);
206        if (it == buffers.end())
207        {
208          newBuffer(*itServer);
209          it = buffers.find(*itServer);
210        }
211        bufferList.push_back(it->second);
212      }
213
214      CTimer::get("Blocking time").resume();
215      do
216      {
217        areBuffersFree = true;
218        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
219          areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
220
221        if (!areBuffersFree)
222        {
223          checkBuffers();
224          if (CServer::serverLevel == 0)
225            context->server->listen();
226
227          else if (CServer::serverLevel == 1)
228          {
229            context->server->listen();
230            for (int i = 0; i < context->serverPrimServer.size(); ++i)
231              context->serverPrimServer[i]->listen();
232          }
233
234          else if (CServer::serverLevel == 2)
235            context->server->listen();
236
237        }
238      } while (!areBuffersFree && !nonBlocking);
239
240      CTimer::get("Blocking time").suspend();
241
242      if (areBuffersFree)
243      {
244        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
245          retBuffers.push_back((*itBuffer)->getBuffer(*itSize));
246      }
247
248      return areBuffersFree;
249   }
250
251   /*!
252   Make a new buffer for a certain connection to server with specific rank
253   \param [in] rank rank of connected server
254   */
255   void CContextClient::newBuffer(int rank)
256   {
257      if (!mapBufferSize_.count(rank))
258      {
259        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
260        mapBufferSize_[rank] = CXios::minBufferSize;
261        maxEventSizes[rank] = CXios::minBufferSize;
262      }
263      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank], maxBufferedEvents);
264      // Notify the server
265      CBufferOut* bufOut = buffer->getBuffer(sizeof(StdSize));
266      bufOut->put(mapBufferSize_[rank]); // Stupid C++
267      buffer->checkBuffer();
268   }
269
270   /*!
271   Verify state of buffers. Buffer is under pending state if there is no message on it
272   \return state of buffers, pending(true), ready(false)
273   */
274   bool CContextClient::checkBuffers(void)
275   {
276      map<int,CClientBuffer*>::iterator itBuff;
277      bool pending = false;
278      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
279        pending |= itBuff->second->checkBuffer();
280      return pending;
281   }
282
283   //! Release all buffers
284   void CContextClient::releaseBuffers()
285   {
286      map<int,CClientBuffer*>::iterator itBuff;
287      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
288      {
289          delete itBuff->second;
290      }
291      buffers.clear();
292   }
293
294   /*!
295   Verify state of buffers corresponding to a connection
296   \param [in] ranks list rank of server to which client connects to
297   \return state of buffers, pending(true), ready(false)
298   */
299   bool CContextClient::checkBuffers(list<int>& ranks)
300   {
301      list<int>::iterator it;
302      bool pending = false;
303      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer();
304      return pending;
305   }
306
307   /*!
308    * Set the buffer size for each connection. Warning: This function is collective.
309    *
310    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
311    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
312   */
313   void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)
314   {
315     mapBufferSize_ = mapSize;
316     maxEventSizes = maxEventSize;
317
318     // Compute the maximum number of events that can be safely buffered.
319     double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max();
320     for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it)
321     {
322       double ratio = double(it->second) / maxEventSize.at(it->first);
323       if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio;
324     }
325     MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm);
326
327     if (minBufferSizeEventSizeRatio < 1.0)
328     {
329       ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)",
330             << "The buffer sizes and the maximum events sizes are incoherent.");
331     }
332     else if (minBufferSizeEventSizeRatio == std::numeric_limits<double>::max())
333       minBufferSizeEventSizeRatio = 1.0; // In this case, maxBufferedEvents will never be used but we want to avoid any floating point exception
334
335     maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server
336                          + size_t(minBufferSizeEventSizeRatio)  // one local buffer can always be fully used
337                          + 1;                                   // the other local buffer might contain only one event
338   }
339
340  /*!
341  Get leading server in the group of connected server
342  \return ranks of leading servers
343  */
344  const std::list<int>& CContextClient::getRanksServerNotLeader(void) const
345  {
346    return ranksServerNotLeader;
347  }
348
349  /*!
350  Check if client connects to leading server
351  \return connected(true), not connected (false)
352  */
353  bool CContextClient::isServerNotLeader(void) const
354  {
355    return !ranksServerNotLeader.empty();
356  }
357
358  /*!
359  Get leading server in the group of connected server
360  \return ranks of leading servers
361  */
362  const std::list<int>& CContextClient::getRanksServerLeader(void) const
363  {
364    return ranksServerLeader;
365  }
366
367  /*!
368  Check if client connects to leading server
369  \return connected(true), not connected (false)
370  */
371  bool CContextClient::isServerLeader(void) const
372  {
373    return !ranksServerLeader.empty();
374  }
375
376  /*!
377   * Check if the attached mode is used.
378   *
379   * \return true if and only if attached mode is used
380   */
381  bool CContextClient::isAttachedModeEnabled() const
382  {
383    return (parentServer != 0);
384  }
385
386   /*!
387   * Finalize context client and do some reports. Function is non-blocking.
388   */
389  void CContextClient::finalize(void)
390  {
391    map<int,CClientBuffer*>::iterator itBuff;
392    bool stop = false;
393
394    CTimer::get("Blocking time").resume();
395    while (hasTemporarilyBufferedEvent())
396    {
397      checkBuffers();
398      sendTemporarilyBufferedEvent();
399    }
400    CTimer::get("Blocking time").suspend();
401
402    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
403    if (isServerLeader())
404    {
405      CMessage msg;
406      const std::list<int>& ranks = getRanksServerLeader();
407      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
408        event.push(*itRank, 1, msg);
409      sendEvent(event);
410    }
411    else sendEvent(event);
412
413    CTimer::get("Blocking time").resume();
414//    while (!stop)
415    {
416      checkBuffers();
417      if (hasTemporarilyBufferedEvent())
418        sendTemporarilyBufferedEvent();
419
420      stop = true;
421//      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest();
422    }
423    CTimer::get("Blocking time").suspend();
424
425    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
426                                          iteMap = mapBufferSize_.end(), itMap;
427
428    StdSize totalBuf = 0;
429    for (itMap = itbMap; itMap != iteMap; ++itMap)
430    {
431      report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl
432                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
433      totalBuf += itMap->second;
434    }
435    report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
436
437    //releaseBuffers(); // moved to CContext::finalize()
438  }
439
440
441  /*!
442  */
443  bool CContextClient::havePendingRequests(void)
444  {
445    bool pending = false;
446    map<int,CClientBuffer*>::iterator itBuff;
447    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
448      pending |= itBuff->second->hasPendingRequest();
449    return pending;
450  }
451
452
453}
Note: See TracBrowser for help on using the repository browser.