source: XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_client.cpp @ 1547

Last change on this file since 1547 was 1547, checked in by ymipsl, 6 years ago

New communication protocol between clients and servers, using hybrid mode of p2p mixt with one_sided communication in order to avoid dead-locking. The constraint of the maximum number of event that can be bufferized on client side is released.

Dev branch is created to be tested before merging.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 16.1 KB
Line 
1#include "xios_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "event_client.hpp"
9#include "context.hpp"
10#include "mpi.hpp"
11#include "timer.hpp"
12#include "cxios.hpp"
13#include "server.hpp"
14
15namespace xios
16{
17    /*!
18    \param [in] parent Pointer to context on client side
19    \param [in] intraComm_ communicator of group client
20    \param [in] interComm_ communicator of group server
21    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used in case of attached mode).
22    */
23    CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
24     : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4)
25    {
26      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
27      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
28     
29      context = parent;
30      intraComm = intraComm_;
31      interComm = interComm_;
32      MPI_Comm_rank(intraComm, &clientRank);
33      MPI_Comm_size(intraComm, &clientSize);
34
35      int flag;
36      MPI_Comm_test_inter(interComm, &flag);
37      if (flag) MPI_Comm_remote_size(interComm, &serverSize);
38      else  MPI_Comm_size(interComm, &serverSize);
39
40      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader);
41
42      if (flag) MPI_Intercomm_merge(interComm_,false,&interCommMerged) ;
43
44      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ;
45
46      timeLine = 1;
47    }
48
49    void CContextClient::computeLeader(int clientRank, int clientSize, int serverSize,
50                                       std::list<int>& rankRecvLeader,
51                                       std::list<int>& rankRecvNotLeader)
52    {
53      if ((0 == clientSize) || (0 == serverSize)) return;
54
55      if (clientSize < serverSize)
56      {
57        int serverByClient = serverSize / clientSize;
58        int remain = serverSize % clientSize;
59        int rankStart = serverByClient * clientRank;
60
61        if (clientRank < remain)
62        {
63          serverByClient++;
64          rankStart += clientRank;
65        }
66        else
67          rankStart += remain;
68
69        for (int i = 0; i < serverByClient; i++)
70          rankRecvLeader.push_back(rankStart + i);
71
72        rankRecvNotLeader.resize(0);
73      }
74      else
75      {
76        int clientByServer = clientSize / serverSize;
77        int remain = clientSize % serverSize;
78
79        if (clientRank < (clientByServer + 1) * remain)
80        {
81          if (clientRank % (clientByServer + 1) == 0)
82            rankRecvLeader.push_back(clientRank / (clientByServer + 1));
83          else
84            rankRecvNotLeader.push_back(clientRank / (clientByServer + 1));
85        }
86        else
87        {
88          int rank = clientRank - (clientByServer + 1) * remain;
89          if (rank % clientByServer == 0)
90            rankRecvLeader.push_back(remain + rank / clientByServer);
91          else
92            rankRecvNotLeader.push_back(remain + rank / clientByServer);
93        }
94      }
95    }
96
97    /*!
98    In case of attached mode, the current context must be reset to context for client
99    \param [in] event Event sent to server
100    */
101    void CContextClient::sendEvent(CEventClient& event)
102    {
103      list<int> ranks = event.getRanks();
104
105      if (CXios::checkEventSync)
106      {
107        int typeId, classId, typeId_in, classId_in, timeLine_out;
108        typeId_in=event.getTypeId() ;
109        classId_in=event.getClassId() ;
110//        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
111        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
112        MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
113        MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
114        if (typeId/clientSize!=event.getTypeId() || classId/clientSize!=event.getClassId() || timeLine_out/clientSize!=timeLine)
115        {
116           ERROR("void CContextClient::sendEvent(CEventClient& event)",
117               << "Event are not coherent between client.");
118        }
119      }
120
121      if (!event.isEmpty())
122      {
123        list<int> sizes = event.getSizes();
124
125         // We force the getBuffers call to be non-blocking on classical servers
126        list<CBufferOut*> buffList;
127//        bool couldBuffer = getBuffers(timeLine, ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) ));
128        getBuffers(timeLine, ranks, sizes, buffList) ;
129
130        event.send(timeLine, sizes, buffList);
131        unlockBuffers(ranks) ;
132         
133        checkBuffers(ranks);
134
135        if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode
136        {
137          waitEvent(ranks);
138          CContext::setCurrent(context->getId());
139        }
140      }
141
142      timeLine++;
143    }
144
145    /*!
146    If client is also server (attached mode), after sending event, it should process right away
147    the incoming event.
148    \param [in] ranks list rank of server connected this client
149    */
150    void CContextClient::waitEvent(list<int>& ranks)
151    {
152      parentServer->server->setPendingEvent();
153      while (checkBuffers(ranks))
154      {
155        parentServer->server->listen();
156        parentServer->server->checkPendingRequest();
157      }
158
159      while (parentServer->server->hasPendingEvent())
160      {
161       parentServer->server->eventLoop();
162      }
163    }
164
165    /*!
166     * Get buffers for each connection to the servers. This function blocks until there is enough room in the buffers unless
167     * it is explicitly requested to be non-blocking.
168     *
169     *
170     * \param [in] timeLine time line of the event which will be sent to servers
171     * \param [in] serverList list of rank of connected server
172     * \param [in] sizeList size of message corresponding to each connection
173     * \param [out] retBuffers list of buffers that can be used to store an event
174     * \param [in] nonBlocking whether this function should be non-blocking
175     * \return whether the already allocated buffers could be used
176    */
177    bool CContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers,
178                                    bool nonBlocking /*= false*/)
179    {
180      list<int>::const_iterator itServer, itSize;
181      list<CClientBuffer*> bufferList;
182      map<int,CClientBuffer*>::const_iterator it;
183      list<CClientBuffer*>::iterator itBuffer;
184      bool areBuffersFree;
185
186      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++)
187      {
188        it = buffers.find(*itServer);
189        if (it == buffers.end())
190        {
191          newBuffer(*itServer);
192          it = buffers.find(*itServer);
193        }
194        bufferList.push_back(it->second);
195      }
196
197      if (CXios::isServer) info(100)<<" getBuffers : entering loop"<<endl ;
198      CTimer::get("Blocking time").resume();
199      do
200      {
201        areBuffersFree = true;
202        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
203        {
204          areBuffersFree &= (*itBuffer)->isBufferFree(*itSize);
205        }
206
207        if (CXios::isServer) info(100)<<" getBuffers : areBuffersFree ? "<<areBuffersFree<<endl ; ;
208        if (!areBuffersFree)
209        {
210          for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer();
211          if (CXios::isServer) info(100)<<" getBuffers : buffers unlocked "<<endl ;
212          checkBuffers();
213          if (CXios::isServer) info(100)<<" getBuffers : buffers checked "<<endl ;
214          if (CServer::serverLevel == 0)  context->server->listen();
215          if (CXios::isServer) info(100)<<" getBuffers : server listened... "<<endl ;
216          else if (CServer::serverLevel == 1)
217          {
218            context->server->listen();
219            for (int i = 0; i < context->serverPrimServer.size(); ++i)  context->serverPrimServer[i]->listen();
220            CServer::contextEventLoop(false) ; // avoid dead-lock at finalize...
221          }
222
223          else if (CServer::serverLevel == 2) context->server->listen();
224
225        }
226      } while (!areBuffersFree && !nonBlocking);
227      if (CXios::isServer) info(100)<<" getBuffers : out of loop"<<endl ;
228      CTimer::get("Blocking time").suspend();
229
230      if (areBuffersFree)
231      {
232        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++)
233          retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize));
234      }
235      if (CXios::isServer) info(100)<<" getBuffers : message pushed"<<endl ;
236      return areBuffersFree;
237   }
238
239   /*!
240   Make a new buffer for a certain connection to server with specific rank
241   \param [in] rank rank of connected server
242   */
243   void CContextClient::newBuffer(int rank)
244   {
245      if (!mapBufferSize_.count(rank))
246      {
247        error(0) << "WARNING: Unexpected request for buffer to communicate with server " << rank << std::endl;
248        mapBufferSize_[rank] = CXios::minBufferSize;
249        maxEventSizes[rank] = CXios::minBufferSize;
250      }
251      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank]);
252      // Notify the server
253      CBufferOut* bufOut = buffer->getBuffer(0, sizeof(StdSize));
254      bufOut->put(mapBufferSize_[rank]); // Stupid C++
255      buffer->checkBuffer(true);
256
257      if (!isAttachedModeEnabled()) // create windows only in server mode
258      {
259        MPI_Comm OneSidedInterComm, oneSidedComm ;
260        MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &OneSidedInterComm );
261        MPI_Intercomm_merge(OneSidedInterComm,false,&oneSidedComm);
262        info(100)<<"DEBUG: before creating windows (client)"<<endl ;
263        buffer->createWindows(oneSidedComm) ;
264        info(100)<<"DEBUG: after creating windows (client)"<<endl ;
265      }
266       
267   }
268
269   /*!
270   Verify state of buffers. Buffer is under pending state if there is no message on it
271   \return state of buffers, pending(true), ready(false)
272   */
273   bool CContextClient::checkBuffers(void)
274   {
275      map<int,CClientBuffer*>::iterator itBuff;
276      bool pending = false;
277      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
278        pending |= itBuff->second->checkBuffer(!pureOneSided);
279      return pending;
280   }
281
282   //! Release all buffers
283   void CContextClient::releaseBuffers()
284   {
285      map<int,CClientBuffer*>::iterator itBuff;
286      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
287      {
288//          CEventClient event(CContext::GetType(), CContext::EVENT_ID_CLOSE_P2P_CHANNEL);
289//          CMessage msg;
290//          event.push(itBuff->first, 1, msg);
291//          timeLine = std::numeric_limits<size_t>::max() ;
292//          sendEvent(event);
293//          while (itBuff->second->checkBuffer(!pureOneSided));
294          delete itBuff->second;
295      }
296      buffers.clear();
297   }
298
299  /*!
300   Lock the buffers for one sided communications
301   \param [in] ranks list rank of server to which client connects to
302   */
303   void CContextClient::lockBuffers(list<int>& ranks)
304   {
305      list<int>::iterator it;
306      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer();
307   }
308
309  /*!
310   Unlock the buffers for one sided communications
311   \param [in] ranks list rank of server to which client connects to
312   */
313   void CContextClient::unlockBuffers(list<int>& ranks)
314   {
315      list<int>::iterator it;
316      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer();
317   }
318     
319   /*!
320   Verify state of buffers corresponding to a connection
321   \param [in] ranks list rank of server to which client connects to
322   \return state of buffers, pending(true), ready(false)
323   */
324   bool CContextClient::checkBuffers(list<int>& ranks)
325   {
326      list<int>::iterator it;
327      bool pending = false;
328      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided);
329      return pending;
330   }
331
332   /*!
333    * Set the buffer size for each connection. Warning: This function is collective.
334    *
335    * \param [in] mapSize maps the rank of the connected servers to the size of the correspoinding buffer
336    * \param [in] maxEventSize maps the rank of the connected servers to the size of the biggest event
337   */
338   void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)
339   {
340     mapBufferSize_ = mapSize;
341     maxEventSizes = maxEventSize;
342   }
343
344  /*!
345  Get leading server in the group of connected server
346  \return ranks of leading servers
347  */
348  const std::list<int>& CContextClient::getRanksServerNotLeader(void) const
349  {
350    return ranksServerNotLeader;
351  }
352
353  /*!
354  Check if client connects to leading server
355  \return connected(true), not connected (false)
356  */
357  bool CContextClient::isServerNotLeader(void) const
358  {
359    return !ranksServerNotLeader.empty();
360  }
361
362  /*!
363  Get leading server in the group of connected server
364  \return ranks of leading servers
365  */
366  const std::list<int>& CContextClient::getRanksServerLeader(void) const
367  {
368    return ranksServerLeader;
369  }
370
371  /*!
372  Check if client connects to leading server
373  \return connected(true), not connected (false)
374  */
375  bool CContextClient::isServerLeader(void) const
376  {
377    return !ranksServerLeader.empty();
378  }
379
380  /*!
381   * Check if the attached mode is used.
382   *
383   * \return true if and only if attached mode is used
384   */
385  bool CContextClient::isAttachedModeEnabled() const
386  {
387    return (parentServer != 0);
388  }
389
390   /*!
391   * Finalize context client and do some reports. Function is non-blocking.
392   */
393  void CContextClient::finalize(void)
394  {
395    map<int,CClientBuffer*>::iterator itBuff;
396    std::list<int>::iterator ItServerLeader; 
397   
398    bool stop = false;
399
400    int* nbServerConnectionLocal  = new int[serverSize] ;
401    int* nbServerConnectionGlobal  = new int[serverSize] ;
402    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ;
403    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ;
404    for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ;
405   
406    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm);
407   
408    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE);
409    CMessage msg;
410
411    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ;
412    sendEvent(event);
413
414    delete[] nbServerConnectionLocal ;
415    delete[] nbServerConnectionGlobal ;
416/*   
417    if (isServerLeader())
418    {
419      CMessage msg;
420      const std::list<int>& ranks = getRanksServerLeader();
421      for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
422      {
423        info(100)<<"DEBUG : Sent context Finalize event to rank "<<*itRank<<endl ;
424        event.push(*itRank, 1, msg);
425      }
426      sendEvent(event);
427    }
428    else sendEvent(event);
429*/
430
431    CTimer::get("Blocking time").resume();
432    checkBuffers();
433    CTimer::get("Blocking time").suspend();
434
435    std::map<int,StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
436                                          iteMap = mapBufferSize_.end(), itMap;
437
438    StdSize totalBuf = 0;
439    for (itMap = itbMap; itMap != iteMap; ++itMap)
440    {
441      report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl
442                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
443      totalBuf += itMap->second;
444    }
445    report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl;
446
447    //releaseBuffers(); // moved to CContext::finalize()
448  }
449
450
451  /*!
452  */
453  bool CContextClient::havePendingRequests(void)
454  {
455    bool pending = false;
456    map<int,CClientBuffer*>::iterator itBuff;
457    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)
458      pending |= itBuff->second->hasPendingRequest();
459    return pending;
460  }
461
462
463}
Note: See TracBrowser for help on using the repository browser.