source: XIOS/trunk/src/context_client.cpp @ 539

Last change on this file since 539 was 512, checked in by mhnguyen, 9 years ago

Correct bug causing infinite run on multiple server and do some code clean

+) Correct a stupid typo which causes the bug
+) Add comments for class Contextclient
+) Remove some redundant codes

Test
+) On Curie
+) Connection mode: Attached: One and Multiple; Seperated: One and Multiple server
+) File mode: One and multiple file
+) All tests passed (Will it make the trusting board become red :)? )

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 9.6 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "message.hpp"
9#include "event_client.hpp"
10#include "context.hpp"
11#include "mpi.hpp"
12#include "timer.hpp"
13#include "cxios.hpp"
14
15namespace xios
16{
17    /*!
18    \param [in] parent Pointer to context on client side
19    \param [in] intraComm_ communicator of group client
20    \param [in] interComm_ communicator of group server
21    \cxtSer [in] cxtSer Pointer to context of server side. (It is only used on case of attached mode)
22    */
23    CContextClient::CContextClient(CContext* parent,MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer)
24     : mapBufferSize_(), parentServer(cxtSer)
25    {
26      context=parent ;
27      intraComm=intraComm_ ;
28      interComm=interComm_ ;
29      MPI_Comm_rank(intraComm,&clientRank) ;
30      MPI_Comm_size(intraComm,&clientSize) ;
31
32      int flag ;
33      MPI_Comm_test_inter(interComm,&flag) ;
34      if (flag) MPI_Comm_remote_size(interComm,&serverSize);
35      else  MPI_Comm_size(interComm,&serverSize) ;
36
37      timeLine=0 ;
38
39    }
40
41    /*!
42    In case of attached mode, the current context must be reset to context for client
43    \param [in] event Event sent to server
44    */
45    void CContextClient::sendEvent(CEventClient& event)
46    {
47      list<int>::iterator itServer ;
48      list<int> ranks ;
49      list<int> sizes ;
50      list<int>::iterator itSize ;
51
52      ranks=event.getRanks() ;
53      if (! event.isEmpty())
54      {
55        sizes=event.getSizes() ;
56        CMessage msg ;
57
58        msg<<*(sizes.begin())<<timeLine ;
59        for(list<int>::iterator it=sizes.begin();it!=sizes.end();it++) *it+=msg.size() ;
60        list<CBufferOut*> buffList=getBuffers(ranks,sizes) ;
61
62        list<CBufferOut*>::iterator it ;
63        for(it=buffList.begin(),itSize=sizes.begin();it!=buffList.end();++it,++itSize)
64        {
65          **it<<*itSize<<timeLine ;
66        }
67        event.send(buffList) ;
68        checkBuffers(ranks) ;
69      }
70
71//      if (context->hasServer)
72      if (0 != parentServer)
73      {
74        waitEvent(ranks);
75        CContext::setCurrent(context->getId());
76      }
77
78      timeLine++ ;
79    }
80
81    /*!
82    Special function to setup size of buffer not only on client side but also on server side
83    corresponding to the connection
84    */
85    void CContextClient::sendBufferSizeEvent()
86    {
87      std::map<int, CClientBuffer*>::iterator it, itE;
88      std::map<int, StdSize>::const_iterator itMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end();
89
90      if (itMap == iteMap)
91         ERROR("CBufferOut*  CContextClient::sendBufferSizeEvent() ;",
92              <<"No information about server buffer, that should not happen...");
93
94      for (; itMap != iteMap; ++itMap)
95      {
96        if (buffers.end() == buffers.find(itMap->first))
97          newBuffer(itMap->first);
98      }
99
100      CBufferOut* bufOut(NULL);
101      itE = buffers.end();
102      for (it = buffers.begin(); it != itE; ++it)
103      {
104        bufOut = (it->second)->getBuffer(sizeof(StdSize));
105        bufOut->put(mapBufferSize_[it->first]);  // Stupid C++
106        (it->second)->checkBuffer();
107      }
108    }
109
110    /*!
111    If client is also server (attached mode), after sending event, it should process right away
112    the incoming event.
113    \param [in] ranks list rank of server connected this client
114    */
115    void CContextClient::waitEvent(list<int>& ranks)
116    {
117//      context->server->setPendingEvent() ;
118//      while(checkBuffers(ranks))
119//      {
120//        context->server->listen() ;
121//        context->server->checkPendingRequest() ;
122//      }
123//
124//      while(context->server->hasPendingEvent())
125//      {
126//       context->server->eventLoop() ;
127//      }
128
129      parentServer->server->setPendingEvent() ;
130      while(checkBuffers(ranks))
131      {
132        parentServer->server->listen() ;
133        parentServer->server->checkPendingRequest() ;
134      }
135
136      while(parentServer->server->hasPendingEvent())
137      {
138       parentServer->server->eventLoop() ;
139      }
140    }
141
142    /*!
143    Setup buffer for each connection to server and verify their state to put content into them
144    \param [in] serverList list of rank of connected server
145    \param [in] sizeList size of message corresponding to each connection
146    \return List of buffer input which event can be placed
147    */
148    list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList)
149    {
150      list<int>::iterator itServer,itSize ;
151      list<CClientBuffer*> bufferList ;
152      map<int,CClientBuffer*>::iterator it ;
153      list<CClientBuffer*>::iterator itBuffer ;
154      list<CBufferOut*>  retBuffer ;
155      bool free ;
156
157      for(itServer=serverList.begin();itServer!=serverList.end();itServer++)
158      {
159        it=buffers.find(*itServer) ;
160        if (it==buffers.end())
161        {
162          newBuffer(*itServer) ;
163          it=buffers.find(*itServer) ;
164        }
165        bufferList.push_back(it->second) ;
166      }
167      free=false ;
168
169      CTimer::get("Blocking time").resume();
170      while(!free)
171      {
172        free=true ;
173        for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
174        {
175          (*itBuffer)->checkBuffer() ;
176         free&=(*itBuffer)->isBufferFree(*itSize) ;
177        }
178      }
179      CTimer::get("Blocking time").suspend();
180
181      for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
182      {
183        retBuffer.push_back((*itBuffer)->getBuffer(*itSize)) ;
184      }
185      return retBuffer ;
186
187   }
188
189   /*!
190   Make a new buffer for a certain connection to server with specific rank
191   \param [in] rank rank of connected server
192   */
193   void CContextClient::newBuffer(int rank)
194   {
195      buffers[rank]=new CClientBuffer(interComm,rank, mapBufferSize_[rank]) ;
196   }
197
198   /*!
199   Verify state of buffers. Buffer is under pending state if there is no message on it
200   \return state of buffers, pending(true), ready(false)
201   */
202   bool CContextClient::checkBuffers(void)
203   {
204      map<int,CClientBuffer*>::iterator itBuff ;
205      bool pending=false ;
206      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) pending|=itBuff->second->checkBuffer() ;
207      return pending ;
208   }
209
210   //! Release all buffers
211   void CContextClient::releaseBuffers(void)
212   {
213      map<int,CClientBuffer*>::iterator itBuff ;
214      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) delete itBuff->second ;
215   }
216
217   /*!
218   Verify state of buffers corresponding to a connection
219   \param [in] ranks list rank of server to which client connects to
220   \return state of buffers, pending(true), ready(false)
221   */
222   bool CContextClient::checkBuffers(list<int>& ranks)
223   {
224      list<int>::iterator it ;
225      bool pending=false ;
226      for(it=ranks.begin();it!=ranks.end();it++) pending|=buffers[*it]->checkBuffer() ;
227      return pending ;
228   }
229
230   /*!
231   Set buffer size for each connection
232   \param [in] mapSize mapping rank of connected server to size of allocated buffer
233   */
234   void CContextClient::setBufferSize(const std::map<int, StdSize>& mapSize)
235   {
236     mapBufferSize_ = mapSize;
237     sendBufferSizeEvent();
238   }
239
240   /*!
241   Get leading server in the group of connected server
242   \return rank of leading server
243   */
244   int CContextClient::getServerLeader(void)
245   {
246     int clientByServer=clientSize/serverSize ;
247     int remain=clientSize%serverSize ;
248
249     if (clientRank<(clientByServer+1)*remain)
250     {
251       return clientRank/(clientByServer+1) ;
252     }
253     else
254     {
255       int rank=clientRank-(clientByServer+1)*remain ;
256       int nbServer=serverSize-remain ;
257       return remain+rank/clientByServer ;
258     }
259   }
260
261   /*!
262   Check if client connects to leading server
263   \return connected(true), not connected (false)
264   */
265   bool CContextClient::isServerLeader(void)
266   {
267     int clientByServer=clientSize/serverSize ;
268     int remain=clientSize%serverSize ;
269
270     if (clientRank<(clientByServer+1)*remain)
271     {
272       if (clientRank%(clientByServer+1)==0) return true ;
273       else return false ;
274     }
275     else
276     {
277       int rank=clientRank-(clientByServer+1)*remain ;
278       int nbServer=serverSize-remain ;
279       if  (rank%clientByServer==0) return true ;
280       else return false ;
281     }
282   }
283
284   /*!
285   Finalize context client and do some reports
286   */
287   void CContextClient::finalize(void)
288   {
289     map<int,CClientBuffer*>::iterator itBuff ;
290     bool stop=true ;
291
292     CEventClient event(CContext::GetType(),CContext::EVENT_ID_CONTEXT_FINALIZE) ;
293     if (isServerLeader())
294     {
295       CMessage msg ;
296       event.push(getServerLeader(),1,msg) ;
297       sendEvent(event) ;
298     }
299     else sendEvent(event) ;
300
301     CTimer::get("Blocking time").resume();
302     while(stop)
303     {
304       checkBuffers() ;
305       stop=false ;
306       for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) stop|=itBuff->second->hasPendingRequest() ;
307     }
308     CTimer::get("Blocking time").suspend();
309
310     std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
311                                            iteMap = mapBufferSize_.end(), itMap;
312     StdSize totalBuf = 0;
313     for (itMap = itbMap; itMap != iteMap; ++itMap)
314     {
315       report(10)<< " Memory report : Context <"<<context->getId()<<"> : client side : memory used for buffer of each connection to server" << endl
316                 << "  +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl;
317       totalBuf += itMap->second;
318     }
319     report(0)<< " Memory report : Context <"<<context->getId()<<"> : client side : total memory used for buffer "<<totalBuf<<" bytes"<<endl ;
320
321     releaseBuffers() ;
322   }
323}
Note: See TracBrowser for help on using the repository browser.