source: XIOS/trunk/src/context_client.cpp @ 1620

Last change on this file since 1620 was 1615, checked in by ymipsl, 5 years ago

Interpolation enhancement :
Domain area can be used to improved global conservation when there is a discrepency between model area and xios computed area.

New domain attribute :

radius : radius af the spherical domain, used to compute area ond the sphere with a normalized radius of 1 (for remapper).

New domain_interpolate attribute :
use_area : remapper will take model computed area in order to perform a global conservation for flux integrated over the cell (mass for example).
In this case the domain attributes "area" and "radius" must be defined on the source or target domain to be taking into account.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 16.2 KB
Line 
1#include "xios_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13#include "server.hpp"
14
15namespace xios
16{
17    /*!
18    \param [in] parent Pointer to context on client side
19    \param [in] intraComm_ communicator of group client
20    \param [in] interComm_ communicator of group server
21    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
22    */
23    CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
24     : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4)
25    {
26      context = parent;
27      intraComm = intraComm_;
28      interComm = interComm_;
29      MPI_Comm_rank(intraComm, &clientRank);
30      MPI_Comm_size(intraComm, &clientSize);
31
32      int flag;
33      MPI_Comm_test_inter(interComm, &flag);
34      if (flag) MPI_Comm_remote_size(interComm, &serverSize);
35      else  MPI_Comm_size(interComm, &serverSize);
36
37      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader);
38
39      timeLine = 0;
40    }
41
42    void CContextClient::computeLeader(int clientRank, int clientSize, int serverSize,
43                                       std::list<int>& rankRecvLeader,
44                                       std::list<int>& rankRecvNotLeader)
45    {
46      if ((0 == clientSize) || (0 == serverSize)) return;
47
48      if (clientSize < serverSize)
49      {
50        int serverByClient = serverSize / clientSize;
51        int remain = serverSize % clientSize;
52        int rankStart = serverByClient * clientRank;
53
54        if (clientRank < remain)
55        {
56          serverByClient++;
57          rankStart += clientRank;
58        }
59        else
60          rankStart += remain;
61
62        for (int i = 0; i < serverByClient; i++)
63          rankRecvLeader.push_back(rankStart + i);
64
65        rankRecvNotLeader.resize(0);
66      }
67      else
68      {
69        int clientByServer = clientSize / serverSize;
70        int remain = clientSize % serverSize;
71
72        if (clientRank < (clientByServer + 1) * remain)
73        {
74          if (clientRank % (clientByServer + 1) == 0)
75            rankRecvLeader.push_back(clientRank / (clientByServer + 1));
76          else
77            rankRecvNotLeader.push_back(clientRank / (clientByServer + 1));
78        }
79        else
80        {
81          int rank = clientRank - (clientByServer + 1) * remain;
82          if (rank % clientByServer == 0)
83            rankRecvLeader.push_back(remain + rank / clientByServer);
84          else
85            rankRecvNotLeader.push_back(remain + rank / clientByServer);
86        }
87      }
88    }
89
90    /*!
91    In case of attached mode, the current context must be reset to context for client
92    \param [in] event Event sent to server
93    */
94    void CContextClient::sendEvent(CEventClient& event)
95    {
96      list<int> ranks = event.getRanks();
97      info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<endl ;
98      if (CXios::checkEventSync)
99      {
100        int typeId, classId, typeId_in, classId_in, timeLine_out;
101        typeId_in=event.getTypeId() ;
102        classId_in=event.getClassId() ;
103//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
104        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
105        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
106        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
107        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
108        {
109           ERROR("void CContextClient::sendEvent(CEventClient& event)",
110               << "Event are not coherent between client.");
111        }
112      }
113
114      if (!event.isEmpty())
115      {
116        list<int> sizes = event.getSizes();
117
118        // We force the getBuffers call to be non-blocking on classical servers
119        list<CBufferOut*> buffList;
120        bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) ));
121//        bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer );
122
123        if (couldBuffer)
124        {
125          event.send(timeLine, sizes, buffList);
126          info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<"  sent"<<endl ;
127
128          checkBuffers(ranks);
129
130          if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
131          {
132            waitEvent(ranks);
133            CContext::setCurrent(context->getId());
134          }
135        }
136        else
137        {
138          tmpBufferedEvent.ranks = ranks;
139          tmpBufferedEvent.sizes = sizes;
140
141          for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++)
142            tmpBufferedEvent.buffers.push_back(new CBufferOut(*it));
143          info(100)<<"DEBUG : temporaly event created : timeline "<<timeLine<<endl ;
144          event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers);
145          info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<"  sent"<<endl ;
146        }
147      }
148
149      timeLine++;
150    }
151
152    /*!
153     * Send the temporarily buffered event (if any).
154     *
155     * \return true if a temporarily buffered event could be sent, false otherwise
156     */
157    bool CContextClient::sendTemporarilyBufferedEvent()
158    {
159      bool couldSendTmpBufferedEvent = false;
160
161      if (hasTemporarilyBufferedEvent())
162      {
163        list<CBufferOut*> buffList;
164        if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call
165        {
166          list<CBufferOut*>::iterator it, itBuffer;
167
168          for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++)
169            (*itBuffer)->put((char*)(*it)->start(), (*it)->count());
170
171          info(100)<<"DEBUG : temporaly event sent "<<endl ;
172          checkBuffers(tmpBufferedEvent.ranks);
173
174          tmpBufferedEvent.clear();
175
176          couldSendTmpBufferedEvent = true;
177        }
178      }
179
180      return couldSendTmpBufferedEvent;
181    }
182
183    /*!
184    If client is also server (attached mode), after sending event, it should process right away
185    the incoming event.
186    \param [in] ranks list rank of server connected this client
187    */
188    void CContextClient::waitEvent(list<int>& ranks)
189    {
190      parentServer->server->setPendingEvent();
191      while (checkBuffers(ranks))
192      {
193        parentServer->server->listen();
194        parentServer->server->checkPendingRequest();
195      }
196
197      while (parentServer->server->hasPendingEvent())
198      {
199       parentServer->server->eventLoop();
200      }
201    }
202
203    /*!
204     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
205     * it is explicitly requested to be non-blocking.
206     *
207     * \param [in] serverList list of rank of connected server
208     * \param [in] sizeList size of message corresponding to each connection
209     * \param [out] retBuffers list of buffers that can be used to store an event
210     * \param [in] nonBlocking whether this function should be non-blocking
211     * \return whether the already allocated buffers could be used
212    */
213    bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers,
214                                    bool nonBlocking /*= false*/)
215    {
216      list<int>::const_iterator itServer, itSize;
217      list<CClientBuffer*> bufferList;
218      map<int,CClientBuffer*>::const_iterator it;
219      list<CClientBuffer*>::iterator itBuffer;
220      bool areBuffersFree;
221
222      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
223      {
224        it = buffers.find(*itServer);
225        if (it == buffers.end())
226        {
227          newBuffer(*itServer);
228          it = buffers.find(*itServer);
229        }
230        bufferList.push_back(it->second);
231      }
232
233      CTimer::get("Blocking time").resume();
234      do
235      {
236        areBuffersFree = true;
237        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
238          areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
239
240        if (!areBuffersFree)
241        {
242          checkBuffers();
243          if (CServer::serverLevel == 0)
244            context->server->listen();
245
246          else if (CServer::serverLevel == 1)
247          {
248            context->server->listen();
249            for (int i = 0; i < context->serverPrimServer.size(); ++i)
250              context->serverPrimServer[i]->listen();
251            CServer::contextEventLoop(false) ; // avoid dead-lock at finalize...
252          }
253
254          else if (CServer::serverLevel == 2)
255            context->server->listen();
256
257        }
258      } while (!areBuffersFree && !nonBlocking);
259
260      CTimer::get("Blocking time").suspend();
261
262      if (areBuffersFree)
263      {
264        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
265          retBuffers.push_back((*itBuffer)->getBuffer(*itSize));
266      }
267
268      return areBuffersFree;
269   }
270
271   /*!
272   Make a new buffer for a certain connection to server with specific rank
273   \param [in] rank rank of connected server
274   */
275   void CContextClient::newBuffer(int rank)
276   {
277      if (!mapBufferSize_.count(rank))
278      {
279        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
280        mapBufferSize_[rank] = CXios::minBufferSize;
281        maxEventSizes[rank] = CXios::minBufferSize;
282      }
283      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank], maxBufferedEvents);
284      // Notify the server
285      CBufferOut* bufOut = buffer->getBuffer(sizeof(StdSize));
286      bufOut->put(mapBufferSize_[rank]); // Stupid C++
287      buffer->checkBuffer();
288   }
289
290   /*!
291   Verify state of buffers. Buffer is under pending state if there is no message on it
292   \return state of buffers, pending(true), ready(false)
293   */
294   bool CContextClient::checkBuffers(void)
295   {
296      map<int,CClientBuffer*>::iterator itBuff;
297      bool pending = false;
298      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
299        pending |= itBuff->second->checkBuffer();
300      return pending;
301   }
302
303   //! Release all buffers
304   void CContextClient::releaseBuffers()
305   {
306      map<int,CClientBuffer*>::iterator itBuff;
307      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
308      {
309          delete itBuff->second;
310      }
311      buffers.clear();
312   }
313
314   /*!
315   Verify state of buffers corresponding to a connection
316   \param [in] ranks list rank of server to which client connects to
317   \return state of buffers, pending(true), ready(false)
318   */
319   bool CContextClient::checkBuffers(list<int>& ranks)
320   {
321      list<int>::iterator it;
322      bool pending = false;
323      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer();
324      return pending;
325   }
326
327   /*!
328    * Set the buffer size for each connection. Warning: This function is collective.
329    *
330    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
331    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
332   */
333   void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)
334   {
335     mapBufferSize_ = mapSize;
336     maxEventSizes = maxEventSize;
337
338     // Compute the maximum number of events that can be safely buffered.
339     double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max();
340     for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it)
341     {
342       double ratio = double(it->second) / maxEventSizes[it->first];
343       if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio;
344     }
345     MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm);
346
347     if (minBufferSizeEventSizeRatio < 1.0)
348     {
349       ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)",
350             << "The buffer sizes and the maximum events sizes are incoherent.");
351     }
352     else if (minBufferSizeEventSizeRatio == std::numeric_limits<double>::max())
353       minBufferSizeEventSizeRatio = 1.0; // In this case, maxBufferedEvents will never be used but we want to avoid any floating point exception
354
355     maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server
356                          + size_t(minBufferSizeEventSizeRatio)  // one local buffer can always be fully used
357                          + 1;                                   // the other local buffer might contain only one event
358   }
359
360  /*!
361  Get leading server in the group of connected server
362  \return ranks of leading servers
363  */
364  const std::list<int>& CContextClient::getRanksServerNotLeader(void) const
365  {
366    return ranksServerNotLeader;
367  }
368
369  /*!
370  Check if client connects to leading server
371  \return connected(true), not connected (false)
372  */
373  bool CContextClient::isServerNotLeader(void) const
374  {
375    return !ranksServerNotLeader.empty();
376  }
377
378  /*!
379  Get leading server in the group of connected server
380  \return ranks of leading servers
381  */
382  const std::list<int>& CContextClient::getRanksServerLeader(void) const
383  {
384    return ranksServerLeader;
385  }
386
387  /*!
388  Check if client connects to leading server
389  \return connected(true), not connected (false)
390  */
391  bool CContextClient::isServerLeader(void) const
392  {
393    return !ranksServerLeader.empty();
394  }
395
396  /*!
397   * Check if the attached mode is used.
398   *
399   * \return true if and only if attached mode is used
400   */
401  bool CContextClient::isAttachedModeEnabled() const
402  {
403    return (parentServer != 0);
404  }
405
406   /*!
407   * Finalize context client and do some reports. Function is non-blocking.
408   */
409  void CContextClient::finalize(void)
410  {
411    map<int,CClientBuffer*>::iterator itBuff;
412    bool stop = false;
413
414    CTimer::get("Blocking time").resume();
415    while (hasTemporarilyBufferedEvent())
416    {
417      checkBuffers();
418      sendTemporarilyBufferedEvent();
419    }
420    CTimer::get("Blocking time").suspend();
421
422    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
423    if (isServerLeader())
424    {
425      CMessage msg;
426      const std::list<int>& ranks = getRanksServerLeader();
427      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
428      {
429        info(100)<<"DEBUG : Sent context Finalize event to rank "<<*itRank<<endl ;
430        event.push(*itRank, 1, msg);
431      }
432      sendEvent(event);
433    }
434    else sendEvent(event);
435
436    CTimer::get("Blocking time").resume();
437//    while (!stop)
438    {
439      checkBuffers();
440      if (hasTemporarilyBufferedEvent())
441        sendTemporarilyBufferedEvent();
442
443      stop = true;
444//      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest();
445    }
446    CTimer::get("Blocking time").suspend();
447
448    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
449                                          iteMap = mapBufferSize_.end(), itMap;
450
451    StdSize totalBuf = 0;
452    for (itMap = itbMap; itMap != iteMap; ++itMap)
453    {
454      report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl
455                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
456      totalBuf += itMap->second;
457    }
458    report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
459
460    //releaseBuffers(); // moved to CContext::finalize()
461  }
462
463
464  /*!
465  */
466  bool CContextClient::havePendingRequests(void)
467  {
468    bool pending = false;
469    map<int,CClientBuffer*>::iterator itBuff;
470    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
471      pending |= itBuff->second->hasPendingRequest();
472    return pending;
473  }
474
475
476}
Note: See TracBrowser for help on using the repository browser.