source: XIOS/trunk/src/context_client.cpp @ 596

Last change on this file since 596 was 596, checked in by rlacroix, 9 years ago

Ensure the buffer sizes are sent synchronously to the servers when using the attached mode.

This issue had no effect in practice however it is safer to ensure proper syncing since it might become critical for future changes.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 10.0 KB
Line 
1#include "xios_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "message.hpp"
9#include "event_client.hpp"
10#include "context.hpp"
11#include "mpi.hpp"
12#include "timer.hpp"
13#include "cxios.hpp"
14
15namespace xios
16{
17    /*!
18    \param [in] parent Pointer to context on client side
19    \param [in] intraComm_ communicator of group client
20    \param [in] interComm_ communicator of group server
21    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used on case of attached mode)
22    */
23    CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
24     : mapBufferSize_(), parentServer(cxtSer)
25    {
26      context = parent;
27      intraComm = intraComm_;
28      interComm = interComm_;
29      MPI_Comm_rank(intraComm, &clientRank);
30      MPI_Comm_size(intraComm, &clientSize);
31
32      int flag;
33      MPI_Comm_test_inter(interComm, &flag);
34      if (flag) MPI_Comm_remote_size(interComm, &serverSize);
35      else  MPI_Comm_size(interComm, &serverSize);
36
37      if (clientSize < serverSize)
38      {
39        int serverByClient = serverSize / clientSize;
40        int remain = serverSize % clientSize;
41        int rankStart = serverByClient * clientRank;
42
43        if (clientRank < remain)
44        {
45          serverByClient++;
46          rankStart += clientRank;
47        }
48        else
49          rankStart += remain;
50
51        for (int i = 0; i < serverByClient; i++)
52          ranksServerLeader.push_back(rankStart + i);
53      }
54      else
55      {
56        int clientByServer = clientSize / serverSize;
57        int remain = clientSize % serverSize;
58
59        if (clientRank < (clientByServer + 1) * remain)
60        {
61          if (clientRank % (clientByServer + 1) == 0)
62            ranksServerLeader.push_back(clientRank / (clientByServer + 1));
63        }
64        else
65        {
66          int rank = clientRank - (clientByServer + 1) * remain;
67          if (rank % clientByServer == 0)
68            ranksServerLeader.push_back(remain + rank / clientByServer);
69        }
70      }
71
72      timeLine = 0;
73    }
74
75    /*!
76    In case of attached mode, the current context must be reset to context for client
77    \param [in] event Event sent to server
78    */
79    void CContextClient::sendEvent(CEventClient& event)
80    {
81      list<int>::iterator itServer;
82      list<int> ranks;
83      list<int> sizes;
84      list<int>::iterator itSize;
85
86      ranks = event.getRanks();
87      if (!event.isEmpty())
88      {
89        sizes = event.getSizes();
90        CMessage msg;
91
92        msg << *(sizes.begin()) << timeLine;
93        for (list<int>::iterator it = sizes.begin(); it != sizes.end(); it++) *it += msg.size();
94        list<CBufferOut*> buffList = getBuffers(ranks, sizes);
95
96        list<CBufferOut*>::iterator it;
97        for (it = buffList.begin(), itSize = sizes.begin(); it != buffList.end(); ++it, ++itSize)
98        {
99          **it << *itSize << timeLine;
100        }
101        event.send(buffList);
102        checkBuffers(ranks);
103      }
104
105      if (0 != parentServer) // attached mode
106      {
107        waitEvent(ranks);
108        CContext::setCurrent(context->getId());
109      }
110
111      timeLine++;
112    }
113
114    /*!
115    Special function to setup size of buffer not only on client side but also on server side
116    corresponding to the connection
117    */
118    void CContextClient::sendBufferSizeEvent()
119    {
120      std::map<int,CClientBuffer*>::iterator it, itE;
121      std::map<int,StdSize>::const_iterator itMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end();
122
123      if (itMap == iteMap)
124         ERROR("void CContextClient::sendBufferSizeEvent()",
125              <<"No information about server buffer, that should not happen...");
126
127      for (; itMap != iteMap; ++itMap)
128      {
129        if (buffers.end() == buffers.find(itMap->first))
130          newBuffer(itMap->first);
131      }
132
133      CBufferOut* bufOut(NULL);
134      itE = buffers.end();
135      for (it = buffers.begin(); it != itE; ++it)
136      {
137        bufOut = (it->second)->getBuffer(sizeof(StdSize));
138        bufOut->put(mapBufferSize_[it->first]);  // Stupid C++
139        (it->second)->checkBuffer();
140      }
141
142      if (0 != parentServer) // attached mode
143      {
144        while (checkBuffers())
145        {
146          parentServer->server->listen();
147        }
148        CContext::setCurrent(context->getId());
149      }
150    }
151
152    /*!
153    If client is also server (attached mode), after sending event, it should process right away
154    the incoming event.
155    \param [in] ranks list rank of server connected this client
156    */
157    void CContextClient::waitEvent(list<int>& ranks)
158    {
159      parentServer->server->setPendingEvent();
160      while (checkBuffers(ranks))
161      {
162        parentServer->server->listen();
163        parentServer->server->checkPendingRequest();
164      }
165
166      while (parentServer->server->hasPendingEvent())
167      {
168       parentServer->server->eventLoop();
169      }
170    }
171
172    /*!
173    Setup buffer for each connection to server and verify their state to put content into them
174    \param [in] serverList list of rank of connected server
175    \param [in] sizeList size of message corresponding to each connection
176    \return List of buffer input which event can be placed
177    */
178    list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList)
179    {
180      list<int>::iterator itServer, itSize;
181      list<CClientBuffer*> bufferList;
182      map<int,CClientBuffer*>::iterator it;
183      list<CClientBuffer*>::iterator itBuffer;
184      list<CBufferOut*>  retBuffer;
185      bool free;
186
187      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
188      {
189        it = buffers.find(*itServer);
190        if (it == buffers.end())
191        {
192          newBuffer(*itServer);
193          it = buffers.find(*itServer);
194        }
195        bufferList.push_back(it->second);
196      }
197      free = false;
198
199      CTimer::get("Blocking time").resume();
200      while (!free)
201      {
202        free = true;
203        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
204        {
205          (*itBuffer)->checkBuffer();
206         free &= (*itBuffer)->isBufferFree(*itSize);
207        }
208      }
209      CTimer::get("Blocking time").suspend();
210
211      for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
212      {
213        retBuffer.push_back((*itBuffer)->getBuffer(*itSize));
214      }
215      return retBuffer;
216   }
217
218   /*!
219   Make a new buffer for a certain connection to server with specific rank
220   \param [in] rank rank of connected server
221   */
222   void CContextClient::newBuffer(int rank)
223   {
224      buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank]);
225   }
226
227   /*!
228   Verify state of buffers. Buffer is under pending state if there is no message on it
229   \return state of buffers, pending(true), ready(false)
230   */
231   bool CContextClient::checkBuffers(void)
232   {
233      map<int,CClientBuffer*>::iterator itBuff;
234      bool pending = false;
235      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) pending |= itBuff->second->checkBuffer();
236      return pending;
237   }
238
239   //! Release all buffers
240   void CContextClient::releaseBuffers(void)
241   {
242      map<int,CClientBuffer*>::iterator itBuff;
243      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) delete itBuff->second;
244   }
245
246   /*!
247   Verify state of buffers corresponding to a connection
248   \param [in] ranks list rank of server to which client connects to
249   \return state of buffers, pending(true), ready(false)
250   */
251   bool CContextClient::checkBuffers(list<int>& ranks)
252   {
253      list<int>::iterator it;
254      bool pending = false;
255      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer();
256      return pending;
257   }
258
259   /*!
260   Set buffer size for each connection
261   \param [in] mapSize mapping rank of connected server to size of allocated buffer
262   */
263   void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize)
264   {
265     mapBufferSize_ = mapSize;
266     sendBufferSizeEvent();
267   }
268
269  /*!
270  Get leading server in the group of connected server
271  \return ranks of leading servers
272  */
273  const std::list<int>& CContextClient::getRanksServerLeader(void) const
274  {
275    return ranksServerLeader;
276  }
277
278  /*!
279  Check if client connects to leading server
280  \return connected(true), not connected (false)
281  */
282  bool CContextClient::isServerLeader(void) const
283  {
284    return !ranksServerLeader.empty();
285  }
286
287   /*!
288   Finalize context client and do some reports
289   */
290   void CContextClient::finalize(void)
291   {
292     map<int,CClientBuffer*>::iterator itBuff;
293     bool stop = true;
294
295     CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
296     if (isServerLeader())
297     {
298       CMessage msg;
299       const std::list<int>& ranks = getRanksServerLeader();
300       for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
301         event.push(*itRank, 1, msg);
302       sendEvent(event);
303     }
304     else sendEvent(event);
305
306     CTimer::get("Blocking time").resume();
307     while (stop)
308     {
309       checkBuffers();
310       stop = false;
311       for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop |= itBuff->second->hasPendingRequest();
312     }
313     CTimer::get("Blocking time").suspend();
314
315     std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
316                                           iteMap = mapBufferSize_.end(), itMap;
317     StdSize totalBuf = 0;
318     for (itMap = itbMap; itMap != iteMap; ++itMap)
319     {
320       report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl
321                  << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
322       totalBuf += itMap->second;
323     }
324     report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
325
326     releaseBuffers();
327   }
328}
Note: See TracBrowser for help on using the repository browser.