source: XIOS/trunk/src/context_client.cpp @ 401

Last change on this file since 401 was 401, checked in by ymipsl, 12 years ago
  • Implement mechanism for tracking memory leak
  • Correct important memory leaks
  • Add complementary report on memory consumption

YM

  • Property svn:eol-style set to native
File size: 5.7 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "message.hpp"
9#include "event_client.hpp"
10#include "context.hpp"
11#include "mpi.hpp"
12#include "timer.hpp"
13#include "cxios.hpp"
14
15namespace xios
16{
17
18
19    CContextClient::CContextClient(CContext* parent,MPI_Comm intraComm_, MPI_Comm interComm_)
20    {
21      context=parent ;
22      intraComm=intraComm_ ;
23      interComm=interComm_ ;
24      MPI_Comm_rank(intraComm,&clientRank) ;
25      MPI_Comm_size(intraComm,&clientSize) ;
26     
27      int flag ;
28      MPI_Comm_test_inter(interComm,&flag) ;
29      if (flag) MPI_Comm_remote_size(interComm,&serverSize);
30      else  MPI_Comm_size(interComm,&serverSize) ;
31     
32      timeLine=0 ;
33
34    }
35
36
37    void CContextClient::sendEvent(CEventClient& event)
38    {
39      list<int>::iterator itServer ;
40      list<int> ranks ;
41      list<int> sizes ; 
42      list<int>::iterator itSize ;
43     
44      ranks=event.getRanks() ;
45      if (! event.isEmpty())
46      {
47        sizes=event.getSizes() ;
48        CMessage msg ;
49
50        msg<<*(sizes.begin())<<timeLine ;
51        for(list<int>::iterator it=sizes.begin();it!=sizes.end();it++) *it+=msg.size() ;
52        list<CBufferOut*> buffList=getBuffers(ranks,sizes) ;
53     
54        list<CBufferOut*>::iterator it ;     
55        for(it=buffList.begin(),itSize=sizes.begin();it!=buffList.end();++it,++itSize)
56        {
57          **it<<*itSize<<timeLine ;
58        }
59        event.send(buffList) ;
60        checkBuffers(ranks) ;
61      }
62
63      if (context->hasServer) waitEvent(ranks) ;
64      timeLine++ ;
65    }
66     
67    void CContextClient::waitEvent(list<int>& ranks)
68    {
69      context->server->setPendingEvent() ;
70      while(checkBuffers(ranks))
71      {
72        context->server->listen() ;
73        context->server->checkPendingRequest() ;
74      }
75
76      while(context->server->hasPendingEvent())
77      {
78       context->server->eventLoop() ;
79      }
80     
81    }
82
83    list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList)
84    {
85      list<int>::iterator itServer,itSize ;
86      list<CClientBuffer*> bufferList ; 
87      map<int,CClientBuffer*>::iterator it ; 
88      list<CClientBuffer*>::iterator itBuffer ; 
89      list<CBufferOut*>  retBuffer ;
90      bool free ;
91
92      for(itServer=serverList.begin();itServer!=serverList.end();itServer++) 
93      {
94        it=buffers.find(*itServer) ;
95        if (it==buffers.end()) 
96        {
97          newBuffer(*itServer) ;
98          it=buffers.find(*itServer) ;
99        }         
100        bufferList.push_back(it->second) ;
101      }
102      free=false ;
103
104      CTimer::get("Blocking time").resume();
105      while(!free)
106      {
107        free=true ;
108        for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
109        {
110          (*itBuffer)->checkBuffer() ;
111         free&=(*itBuffer)->isBufferFree(*itSize) ;
112        }
113      }
114      CTimer::get("Blocking time").suspend();
115
116      for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
117      {
118        retBuffer.push_back((*itBuffer)->getBuffer(*itSize)) ;
119      }
120      return retBuffer ;             
121   
122   }
123     
124   void CContextClient::newBuffer(int rank)
125   {
126      buffers[rank]=new CClientBuffer(interComm,rank) ;
127   } 
128
129   bool CContextClient::checkBuffers(void)
130   {
131      map<int,CClientBuffer*>::iterator itBuff ;
132      bool pending=false ;
133      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) pending|=itBuff->second->checkBuffer() ;
134      return pending ;
135   } 
136
137   void CContextClient::releaseBuffers(void)
138   {
139      map<int,CClientBuffer*>::iterator itBuff ;
140      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) delete itBuff->second ;
141   } 
142
143   bool CContextClient::checkBuffers(list<int>& ranks)
144   {
145      list<int>::iterator it ;
146      bool pending=false ;
147      for(it=ranks.begin();it!=ranks.end();it++) pending|=buffers[*it]->checkBuffer() ;
148      return pending ;
149   } 
150
151   int CContextClient::getServerLeader(void)
152   {
153     int clientByServer=clientSize/serverSize ;
154     int remain=clientSize%serverSize ;
155     
156     if (clientRank<(clientByServer+1)*remain)
157     {
158       return clientRank/(clientByServer+1) ;
159     }
160     else
161     {
162       int rank=clientRank-(clientByServer+1)*remain ;
163       int nbServer=serverSize-remain ;
164       return remain+rank/clientByServer ;
165     }
166   }     
167
168   bool CContextClient::isServerLeader(void)
169   {
170     int clientByServer=clientSize/serverSize ;
171     int remain=clientSize%serverSize ;
172     
173     if (clientRank<(clientByServer+1)*remain)
174     {
175       if (clientRank%(clientByServer+1)==0) return true ;
176       else return false ;
177     }
178     else
179     {
180       int rank=clientRank-(clientByServer+1)*remain ;
181       int nbServer=serverSize-remain ;
182       if  (rank%clientByServer==0) return true ;
183       else return false ;
184     } 
185   }
186     
187   void CContextClient::finalize(void)
188   {
189     
190     map<int,CClientBuffer*>::iterator itBuff ;
191     bool stop=true ;
192
193     CEventClient event(CContext::GetType(),CContext::EVENT_ID_CONTEXT_FINALIZE) ;   
194     if (isServerLeader())
195     {
196       CMessage msg ;
197       event.push(getServerLeader(),1,msg) ;
198     }
199     sendEvent(event) ;
200 
201     CTimer::get("Blocking time").resume();
202     while(stop)
203     {
204       checkBuffers() ;
205       stop=false ;
206       for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) stop|=itBuff->second->hasPendingRequest() ;
207     }
208     CTimer::get("Blocking time").suspend();
209     report(0)<< " Memory report : Context <"<<context->getId()<<"> : client side : total memory used for buffer "<<buffers.size()*CXios::bufferSize<<" bytes"<<endl ;
210     
211     releaseBuffers() ;
212   }
213}     
Note: See TracBrowser for help on using the repository browser.