source: XIOS/trunk/src/context_client.cpp @ 593

Last change on this file since 593 was 591, checked in by rlacroix, 9 years ago

Remove leftovers from the XMLIO age.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 9.6 KB
RevLine 
[591]1#include "xios_spl.hpp"
[300]2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "message.hpp"
9#include "event_client.hpp"
10#include "context.hpp"
[382]11#include "mpi.hpp"
[347]12#include "timer.hpp"
[401]13#include "cxios.hpp"
[300]14
[335]15namespace xios
[300]16{
[512]17    /*!
18    \param [in] parent Pointer to context on client side
19    \param [in] intraComm_ communicator of group client
20    \param [in] interComm_ communicator of group server
21    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used on case of attached mode)
22    */
[511]23    CContextClient::CContextClient(CContext* parent,MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
24     : mapBufferSize_(), parentServer(cxtSer)
[300]25    {
26      context=parent ;
27      intraComm=intraComm_ ;
28      interComm=interComm_ ;
29      MPI_Comm_rank(intraComm,&clientRank) ;
30      MPI_Comm_size(intraComm,&clientSize) ;
[509]31
[300]32      int flag ;
33      MPI_Comm_test_inter(interComm,&flag) ;
34      if (flag) MPI_Comm_remote_size(interComm,&serverSize);
35      else  MPI_Comm_size(interComm,&serverSize) ;
[509]36
[300]37      timeLine=0 ;
38
39    }
40
[512]41    /*!
42    In case of attached mode, the current context must be reset to context for client
43    \param [in] event Event sent to server
44    */
[300]45    void CContextClient::sendEvent(CEventClient& event)
46    {
47      list<int>::iterator itServer ;
48      list<int> ranks ;
[509]49      list<int> sizes ;
[300]50      list<int>::iterator itSize ;
[509]51
[300]52      ranks=event.getRanks() ;
53      if (! event.isEmpty())
54      {
55        sizes=event.getSizes() ;
56        CMessage msg ;
57
58        msg<<*(sizes.begin())<<timeLine ;
59        for(list<int>::iterator it=sizes.begin();it!=sizes.end();it++) *it+=msg.size() ;
60        list<CBufferOut*> buffList=getBuffers(ranks,sizes) ;
[509]61
62        list<CBufferOut*>::iterator it ;
[300]63        for(it=buffList.begin(),itSize=sizes.begin();it!=buffList.end();++it,++itSize)
64        {
65          **it<<*itSize<<timeLine ;
66        }
67        event.send(buffList) ;
68        checkBuffers(ranks) ;
69      }
70
[511]71//      if (context->hasServer)
72      if (0 != parentServer)
73      {
74        waitEvent(ranks);
75        CContext::setCurrent(context->getId());
76      }
77
[300]78      timeLine++ ;
79    }
[509]80
[512]81    /*!
82    Special function to setup size of buffer not only on client side but also on server side
83    corresponding to the connection
84    */
[509]85    void CContextClient::sendBufferSizeEvent()
86    {
87      std::map<int, CClientBuffer*>::iterator it, itE;
88      std::map<int, StdSize>::const_iterator itMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end();
89
90      if (itMap == iteMap)
91         ERROR("CBufferOut*  CContextClient::sendBufferSizeEvent() ;",
92              <<"No information about server buffer, that should not happen...");
93
[512]94      for (; itMap != iteMap; ++itMap)
[509]95      {
96        if (buffers.end() == buffers.find(itMap->first))
97          newBuffer(itMap->first);
98      }
99
100      CBufferOut* bufOut(NULL);
101      itE = buffers.end();
102      for (it = buffers.begin(); it != itE; ++it)
103      {
104        bufOut = (it->second)->getBuffer(sizeof(StdSize));
105        bufOut->put(mapBufferSize_[it->first]);  // Stupid C++
106        (it->second)->checkBuffer();
107      }
108    }
109
[512]110    /*!
111    If client is also server (attached mode), after sending event, it should process right away
112    the incoming event.
113    \param [in] ranks list rank of server connected this client
114    */
[300]115    void CContextClient::waitEvent(list<int>& ranks)
116    {
[511]117//      context->server->setPendingEvent() ;
118//      while(checkBuffers(ranks))
119//      {
120//        context->server->listen() ;
121//        context->server->checkPendingRequest() ;
122//      }
123//
124//      while(context->server->hasPendingEvent())
125//      {
126//       context->server->eventLoop() ;
127//      }
128
129      parentServer->server->setPendingEvent() ;
[386]130      while(checkBuffers(ranks))
[300]131      {
[511]132        parentServer->server->listen() ;
133        parentServer->server->checkPendingRequest() ;
[300]134      }
[386]135
[511]136      while(parentServer->server->hasPendingEvent())
[386]137      {
[511]138       parentServer->server->eventLoop() ;
[386]139      }
[300]140    }
141
[512]142    /*!
143    Setup buffer for each connection to server and verify their state to put content into them
144    \param [in] serverList list of rank of connected server
145    \param [in] sizeList size of message corresponding to each connection
146    \return List of buffer input which event can be placed
147    */
[300]148    list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList)
149    {
150      list<int>::iterator itServer,itSize ;
[509]151      list<CClientBuffer*> bufferList ;
152      map<int,CClientBuffer*>::iterator it ;
153      list<CClientBuffer*>::iterator itBuffer ;
[300]154      list<CBufferOut*>  retBuffer ;
155      bool free ;
156
[509]157      for(itServer=serverList.begin();itServer!=serverList.end();itServer++)
[300]158      {
159        it=buffers.find(*itServer) ;
[509]160        if (it==buffers.end())
[300]161        {
162          newBuffer(*itServer) ;
163          it=buffers.find(*itServer) ;
[509]164        }
[300]165        bufferList.push_back(it->second) ;
166      }
167      free=false ;
[347]168
169      CTimer::get("Blocking time").resume();
[300]170      while(!free)
171      {
172        free=true ;
173        for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
174        {
175          (*itBuffer)->checkBuffer() ;
176         free&=(*itBuffer)->isBufferFree(*itSize) ;
177        }
178      }
[347]179      CTimer::get("Blocking time").suspend();
180
[300]181      for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
182      {
183        retBuffer.push_back((*itBuffer)->getBuffer(*itSize)) ;
184      }
[509]185      return retBuffer ;
186
[300]187   }
[509]188
[512]189   /*!
190   Make a new buffer for a certain connection to server with specific rank
191   \param [in] rank rank of connected server
192   */
[300]193   void CContextClient::newBuffer(int rank)
194   {
[509]195      buffers[rank]=new CClientBuffer(interComm,rank, mapBufferSize_[rank]) ;
196   }
[300]197
[512]198   /*!
199   Verify state of buffers. Buffer is under pending state if there is no message on it
200   \return state of buffers, pending(true), ready(false)
201   */
[300]202   bool CContextClient::checkBuffers(void)
203   {
204      map<int,CClientBuffer*>::iterator itBuff ;
205      bool pending=false ;
206      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) pending|=itBuff->second->checkBuffer() ;
207      return pending ;
[509]208   }
[300]209
[512]210   //! Release all buffers
[300]211   void CContextClient::releaseBuffers(void)
212   {
213      map<int,CClientBuffer*>::iterator itBuff ;
214      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) delete itBuff->second ;
[509]215   }
[300]216
[512]217   /*!
218   Verify state of buffers corresponding to a connection
219   \param [in] ranks list rank of server to which client connects to
220   \return state of buffers, pending(true), ready(false)
221   */
[300]222   bool CContextClient::checkBuffers(list<int>& ranks)
223   {
224      list<int>::iterator it ;
225      bool pending=false ;
226      for(it=ranks.begin();it!=ranks.end();it++) pending|=buffers[*it]->checkBuffer() ;
227      return pending ;
[509]228   }
[300]229
[512]230   /*!
231   Set buffer size for each connection
232   \param [in] mapSize mapping rank of connected server to size of allocated buffer
233   */
[509]234   void CContextClient::setBufferSize(const std::map<int, StdSize>& mapSize)
235   {
236     mapBufferSize_ = mapSize;
237     sendBufferSizeEvent();
238   }
239
[512]240   /*!
241   Get leading server in the group of connected server
242   \return rank of leading server
243   */
[300]244   int CContextClient::getServerLeader(void)
245   {
246     int clientByServer=clientSize/serverSize ;
247     int remain=clientSize%serverSize ;
[509]248
[300]249     if (clientRank<(clientByServer+1)*remain)
250     {
251       return clientRank/(clientByServer+1) ;
252     }
253     else
254     {
255       int rank=clientRank-(clientByServer+1)*remain ;
256       int nbServer=serverSize-remain ;
257       return remain+rank/clientByServer ;
258     }
[509]259   }
[300]260
[512]261   /*!
262   Check if client connects to leading server
263   \return connected(true), not connected (false)
264   */
[300]265   bool CContextClient::isServerLeader(void)
266   {
267     int clientByServer=clientSize/serverSize ;
268     int remain=clientSize%serverSize ;
[509]269
[300]270     if (clientRank<(clientByServer+1)*remain)
271     {
272       if (clientRank%(clientByServer+1)==0) return true ;
273       else return false ;
274     }
275     else
276     {
277       int rank=clientRank-(clientByServer+1)*remain ;
278       int nbServer=serverSize-remain ;
279       if  (rank%clientByServer==0) return true ;
280       else return false ;
[509]281     }
[300]282   }
[509]283
[512]284   /*!
285   Finalize context client and do some reports
286   */
[300]287   void CContextClient::finalize(void)
288   {
289     map<int,CClientBuffer*>::iterator itBuff ;
290     bool stop=true ;
291
[509]292     CEventClient event(CContext::GetType(),CContext::EVENT_ID_CONTEXT_FINALIZE) ;
[300]293     if (isServerLeader())
294     {
295       CMessage msg ;
296       event.push(getServerLeader(),1,msg) ;
[419]297       sendEvent(event) ;
[300]298     }
[419]299     else sendEvent(event) ;
[509]300
[347]301     CTimer::get("Blocking time").resume();
[300]302     while(stop)
303     {
304       checkBuffers() ;
305       stop=false ;
306       for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) stop|=itBuff->second->hasPendingRequest() ;
307     }
[347]308     CTimer::get("Blocking time").suspend();
[509]309
[511]310     std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
311                                            iteMap = mapBufferSize_.end(), itMap;
312     StdSize totalBuf = 0;
313     for (itMap = itbMap; itMap != iteMap; ++itMap)
314     {
315       report(10)<< " Memory report : Context <"<<context->getId()<<"> : client side : memory used for buffer of each connection to server" << endl
[512]316                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
[511]317       totalBuf += itMap->second;
318     }
319     report(0)<< " Memory report : Context <"<<context->getId()<<"> : client side : total memory used for buffer "<<totalBuf<<" bytes"<<endl ;
320
[300]321     releaseBuffers() ;
322   }
[509]323}
Note: See TracBrowser for help on using the repository browser.