source: XIOS/trunk/src/context_client.cpp @ 510

Last change on this file since 510 was 509, checked in by mhnguyen, 10 years ago

Implementing buffer size auto-detection for mode client -server

+) Process xml tree in client side then send all the information to server
+) Only information enabled fields in enabled files are sent to server
+) Some important change in structure of code which must be refactored

Test
+) On Curie
+) Only mode client-server
+) Passed for all tests

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 6.7 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "message.hpp"
9#include "event_client.hpp"
10#include "context.hpp"
11#include "mpi.hpp"
12#include "timer.hpp"
13#include "cxios.hpp"
14
15namespace xios
16{
17
18
19    CContextClient::CContextClient(CContext* parent,MPI_Comm intraComm_, MPI_Comm interComm_) : mapBufferSize_()
20    {
21      context=parent ;
22      intraComm=intraComm_ ;
23      interComm=interComm_ ;
24      MPI_Comm_rank(intraComm,&clientRank) ;
25      MPI_Comm_size(intraComm,&clientSize) ;
26
27      int flag ;
28      MPI_Comm_test_inter(interComm,&flag) ;
29      if (flag) MPI_Comm_remote_size(interComm,&serverSize);
30      else  MPI_Comm_size(interComm,&serverSize) ;
31
32      timeLine=0 ;
33
34    }
35
36
37    void CContextClient::sendEvent(CEventClient& event)
38    {
39      list<int>::iterator itServer ;
40      list<int> ranks ;
41      list<int> sizes ;
42      list<int>::iterator itSize ;
43
44      ranks=event.getRanks() ;
45      if (! event.isEmpty())
46      {
47        sizes=event.getSizes() ;
48        CMessage msg ;
49
50        msg<<*(sizes.begin())<<timeLine ;
51        for(list<int>::iterator it=sizes.begin();it!=sizes.end();it++) *it+=msg.size() ;
52        list<CBufferOut*> buffList=getBuffers(ranks,sizes) ;
53
54        list<CBufferOut*>::iterator it ;
55        for(it=buffList.begin(),itSize=sizes.begin();it!=buffList.end();++it,++itSize)
56        {
57          **it<<*itSize<<timeLine ;
58        }
59        event.send(buffList) ;
60        checkBuffers(ranks) ;
61      }
62
63      if (context->hasServer) waitEvent(ranks) ;
64      timeLine++ ;
65    }
66
67    void CContextClient::sendBufferSizeEvent()
68    {
69      std::map<int, CClientBuffer*>::iterator it, itE;
70      std::map<int, StdSize>::const_iterator itMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end();
71
72      if (itMap == iteMap)
73         ERROR("CBufferOut*  CContextClient::sendBufferSizeEvent() ;",
74              <<"No information about server buffer, that should not happen...");
75
76      for (; itMap != iteMap; ++iteMap)
77      {
78        if (buffers.end() == buffers.find(itMap->first))
79          newBuffer(itMap->first);
80      }
81
82      CBufferOut* bufOut(NULL);
83      itE = buffers.end();
84      for (it = buffers.begin(); it != itE; ++it)
85      {
86        bufOut = (it->second)->getBuffer(sizeof(StdSize));
87        bufOut->put(mapBufferSize_[it->first]);  // Stupid C++
88        (it->second)->checkBuffer();
89      }
90    }
91
92    void CContextClient::waitEvent(list<int>& ranks)
93    {
94      context->server->setPendingEvent() ;
95      while(checkBuffers(ranks))
96      {
97        context->server->listen() ;
98        context->server->checkPendingRequest() ;
99      }
100
101      while(context->server->hasPendingEvent())
102      {
103       context->server->eventLoop() ;
104      }
105
106    }
107
108    list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList)
109    {
110      list<int>::iterator itServer,itSize ;
111      list<CClientBuffer*> bufferList ;
112      map<int,CClientBuffer*>::iterator it ;
113      list<CClientBuffer*>::iterator itBuffer ;
114      list<CBufferOut*>  retBuffer ;
115      bool free ;
116
117      for(itServer=serverList.begin();itServer!=serverList.end();itServer++)
118      {
119        it=buffers.find(*itServer) ;
120        if (it==buffers.end())
121        {
122          newBuffer(*itServer) ;
123          it=buffers.find(*itServer) ;
124        }
125        bufferList.push_back(it->second) ;
126      }
127      free=false ;
128
129      CTimer::get("Blocking time").resume();
130      while(!free)
131      {
132        free=true ;
133        for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
134        {
135          (*itBuffer)->checkBuffer() ;
136         free&=(*itBuffer)->isBufferFree(*itSize) ;
137        }
138      }
139      CTimer::get("Blocking time").suspend();
140
141      for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
142      {
143        retBuffer.push_back((*itBuffer)->getBuffer(*itSize)) ;
144      }
145      return retBuffer ;
146
147   }
148
149   void CContextClient::newBuffer(int rank)
150   {
151//     buffers[rank]=new CClientBuffer(interComm,rank);
152      buffers[rank]=new CClientBuffer(interComm,rank, mapBufferSize_[rank]) ;
153   }
154
155   bool CContextClient::checkBuffers(void)
156   {
157      map<int,CClientBuffer*>::iterator itBuff ;
158      bool pending=false ;
159      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) pending|=itBuff->second->checkBuffer() ;
160      return pending ;
161   }
162
163   void CContextClient::releaseBuffers(void)
164   {
165      map<int,CClientBuffer*>::iterator itBuff ;
166      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) delete itBuff->second ;
167   }
168
169   bool CContextClient::checkBuffers(list<int>& ranks)
170   {
171      list<int>::iterator it ;
172      bool pending=false ;
173      for(it=ranks.begin();it!=ranks.end();it++) pending|=buffers[*it]->checkBuffer() ;
174      return pending ;
175   }
176
177   void CContextClient::setBufferSize(const std::map<int, StdSize>& mapSize)
178   {
179     mapBufferSize_ = mapSize;
180     sendBufferSizeEvent();
181   }
182
183   int CContextClient::getServerLeader(void)
184   {
185     int clientByServer=clientSize/serverSize ;
186     int remain=clientSize%serverSize ;
187
188     if (clientRank<(clientByServer+1)*remain)
189     {
190       return clientRank/(clientByServer+1) ;
191     }
192     else
193     {
194       int rank=clientRank-(clientByServer+1)*remain ;
195       int nbServer=serverSize-remain ;
196       return remain+rank/clientByServer ;
197     }
198   }
199
200   bool CContextClient::isServerLeader(void)
201   {
202     int clientByServer=clientSize/serverSize ;
203     int remain=clientSize%serverSize ;
204
205     if (clientRank<(clientByServer+1)*remain)
206     {
207       if (clientRank%(clientByServer+1)==0) return true ;
208       else return false ;
209     }
210     else
211     {
212       int rank=clientRank-(clientByServer+1)*remain ;
213       int nbServer=serverSize-remain ;
214       if  (rank%clientByServer==0) return true ;
215       else return false ;
216     }
217   }
218
219   void CContextClient::finalize(void)
220   {
221
222     map<int,CClientBuffer*>::iterator itBuff ;
223     bool stop=true ;
224
225     CEventClient event(CContext::GetType(),CContext::EVENT_ID_CONTEXT_FINALIZE) ;
226     if (isServerLeader())
227     {
228       CMessage msg ;
229       event.push(getServerLeader(),1,msg) ;
230       sendEvent(event) ;
231     }
232     else sendEvent(event) ;
233
234     CTimer::get("Blocking time").resume();
235     while(stop)
236     {
237       checkBuffers() ;
238       stop=false ;
239       for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) stop|=itBuff->second->hasPendingRequest() ;
240     }
241     CTimer::get("Blocking time").suspend();
242     report(0)<< " Memory report : Context <"<<context->getId()<<"> : client side : total memory used for buffer "<<buffers.size()*CXios::bufferSize<<" bytes"<<endl ;
243
244     releaseBuffers() ;
245   }
246}
Note: See TracBrowser for help on using the repository browser.