source: XIOS/trunk/src/context_client.cpp @ 391

Last change on this file since 391 was 386, checked in by ymipsl, 12 years ago

Suprress a dead-lock rising in connected mod, probably due to the quality of MPI implementation (openMPI at less).

YM

  • Property svn:eol-style set to native
File size: 5.6 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "message.hpp"
9#include "event_client.hpp"
10#include "context.hpp"
11#include "mpi.hpp"
12#include "timer.hpp"
13
14namespace xios
15{
16
17
18    CContextClient::CContextClient(CContext* parent,MPI_Comm intraComm_, MPI_Comm interComm_)
19    {
20      context=parent ;
21      intraComm=intraComm_ ;
22      interComm=interComm_ ;
23      MPI_Comm_rank(intraComm,&clientRank) ;
24      MPI_Comm_size(intraComm,&clientSize) ;
25     
26      int flag ;
27      MPI_Comm_test_inter(interComm,&flag) ;
28      if (flag) MPI_Comm_remote_size(interComm,&serverSize);
29      else  MPI_Comm_size(interComm,&serverSize) ;
30     
31      timeLine=0 ;
32
33    }
34
35
36    void CContextClient::sendEvent(CEventClient& event)
37    {
38      list<int>::iterator itServer ;
39      list<int> ranks ;
40      list<int> sizes ; 
41      list<int>::iterator itSize ;
42     
43      ranks=event.getRanks() ;
44      if (! event.isEmpty())
45      {
46        sizes=event.getSizes() ;
47        CMessage msg ;
48
49        msg<<*(sizes.begin())<<timeLine ;
50        for(list<int>::iterator it=sizes.begin();it!=sizes.end();it++) *it+=msg.size() ;
51        list<CBufferOut*> buffList=getBuffers(ranks,sizes) ;
52     
53        list<CBufferOut*>::iterator it ;     
54        for(it=buffList.begin(),itSize=sizes.begin();it!=buffList.end();++it,++itSize)
55        {
56          **it<<*itSize<<timeLine ;
57        }
58        event.send(buffList) ;
59        checkBuffers(ranks) ;
60      }
61
62      if (context->hasServer) waitEvent(ranks) ;
63      timeLine++ ;
64    }
65     
66    void CContextClient::waitEvent(list<int>& ranks)
67    {
68      context->server->setPendingEvent() ;
69      while(checkBuffers(ranks))
70      {
71        context->server->listen() ;
72        context->server->checkPendingRequest() ;
73      }
74
75      while(context->server->hasPendingEvent())
76      {
77       context->server->eventLoop() ;
78      }
79     
80    }
81
82    list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList)
83    {
84      list<int>::iterator itServer,itSize ;
85      list<CClientBuffer*> bufferList ; 
86      map<int,CClientBuffer*>::iterator it ; 
87      list<CClientBuffer*>::iterator itBuffer ; 
88      list<CBufferOut*>  retBuffer ;
89      bool free ;
90
91      for(itServer=serverList.begin();itServer!=serverList.end();itServer++) 
92      {
93        it=buffers.find(*itServer) ;
94        if (it==buffers.end()) 
95        {
96          newBuffer(*itServer) ;
97          it=buffers.find(*itServer) ;
98        }         
99        bufferList.push_back(it->second) ;
100      }
101      free=false ;
102
103      CTimer::get("Blocking time").resume();
104      while(!free)
105      {
106        free=true ;
107        for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
108        {
109          (*itBuffer)->checkBuffer() ;
110         free&=(*itBuffer)->isBufferFree(*itSize) ;
111        }
112      }
113      CTimer::get("Blocking time").suspend();
114
115      for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
116      {
117        retBuffer.push_back((*itBuffer)->getBuffer(*itSize)) ;
118      }
119      return retBuffer ;             
120   
121   }
122     
123   void CContextClient::newBuffer(int rank)
124   {
125      buffers[rank]=new CClientBuffer(interComm,rank) ;
126   } 
127
128   bool CContextClient::checkBuffers(void)
129   {
130      map<int,CClientBuffer*>::iterator itBuff ;
131      bool pending=false ;
132      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) pending|=itBuff->second->checkBuffer() ;
133      return pending ;
134   } 
135
136   void CContextClient::releaseBuffers(void)
137   {
138      map<int,CClientBuffer*>::iterator itBuff ;
139      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) delete itBuff->second ;
140   } 
141
142   bool CContextClient::checkBuffers(list<int>& ranks)
143   {
144      list<int>::iterator it ;
145      bool pending=false ;
146      for(it=ranks.begin();it!=ranks.end();it++) pending|=buffers[*it]->checkBuffer() ;
147      return pending ;
148   } 
149
150   int CContextClient::getServerLeader(void)
151   {
152     int clientByServer=clientSize/serverSize ;
153     int remain=clientSize%serverSize ;
154     
155     if (clientRank<(clientByServer+1)*remain)
156     {
157       return clientRank/(clientByServer+1) ;
158     }
159     else
160     {
161       int rank=clientRank-(clientByServer+1)*remain ;
162       int nbServer=serverSize-remain ;
163       return remain+rank/clientByServer ;
164     }
165   }     
166
167   bool CContextClient::isServerLeader(void)
168   {
169     int clientByServer=clientSize/serverSize ;
170     int remain=clientSize%serverSize ;
171     
172     if (clientRank<(clientByServer+1)*remain)
173     {
174       if (clientRank%(clientByServer+1)==0) return true ;
175       else return false ;
176     }
177     else
178     {
179       int rank=clientRank-(clientByServer+1)*remain ;
180       int nbServer=serverSize-remain ;
181       if  (rank%clientByServer==0) return true ;
182       else return false ;
183     } 
184   }
185     
186   void CContextClient::finalize(void)
187   {
188     
189     map<int,CClientBuffer*>::iterator itBuff ;
190     bool stop=true ;
191
192     CEventClient event(CContext::GetType(),CContext::EVENT_ID_CONTEXT_FINALIZE) ;   
193     if (isServerLeader())
194     {
195       CMessage msg ;
196       event.push(getServerLeader(),1,msg) ;
197     }
198     sendEvent(event) ;
199 
200     CTimer::get("Blocking time").resume();
201     while(stop)
202     {
203       checkBuffers() ;
204       stop=false ;
205       for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) stop|=itBuff->second->hasPendingRequest() ;
206     }
207     CTimer::get("Blocking time").suspend();
208     
209     releaseBuffers() ;
210   }
211}     
Note: See TracBrowser for help on using the repository browser.