New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
context_client.cpp in vendors/XIOS/current/src – NEMO

source: vendors/XIOS/current/src/context_client.cpp @ 3428

Last change on this file since 3428 was 3428, checked in by rblod, 12 years ago

importing initial XIOS vendor drop

File size: 5.4 KB
Line 
1#include "xmlioserver_spl.hpp"
2#include "context_client.hpp"
3#include "context_server.hpp"
4#include "event_client.hpp"
5#include "buffer_out.hpp"
6#include "buffer_client.hpp"
7#include "type.hpp"
8#include "message.hpp"
9#include "event_client.hpp"
10#include "context.hpp"
11#include <mpi.h>
12#include "timer.hpp"
13
14namespace xios
15{
16
17
18    CContextClient::CContextClient(CContext* parent,MPI_Comm intraComm_, MPI_Comm interComm_)
19    {
20      context=parent ;
21      intraComm=intraComm_ ;
22      interComm=interComm_ ;
23      MPI_Comm_rank(intraComm,&clientRank) ;
24      MPI_Comm_size(intraComm,&clientSize) ;
25     
26      int flag ;
27      MPI_Comm_test_inter(interComm,&flag) ;
28      if (flag) MPI_Comm_remote_size(interComm,&serverSize);
29      else  MPI_Comm_size(interComm,&serverSize) ;
30     
31      timeLine=0 ;
32
33    }
34
35
36    void CContextClient::sendEvent(CEventClient& event)
37    {
38      list<int>::iterator itServer ;
39      list<int> ranks ;
40      list<int> sizes ; 
41      list<int>::iterator itSize ;
42     
43      ranks=event.getRanks() ;
44      if (! event.isEmpty())
45      {
46        sizes=event.getSizes() ;
47        CMessage msg ;
48
49        msg<<*(sizes.begin())<<timeLine ;
50        for(list<int>::iterator it=sizes.begin();it!=sizes.end();it++) *it+=msg.size() ;
51        list<CBufferOut*> buffList=getBuffers(ranks,sizes) ;
52     
53        list<CBufferOut*>::iterator it ;     
54        for(it=buffList.begin(),itSize=sizes.begin();it!=buffList.end();++it,++itSize)
55        {
56          **it<<*itSize<<timeLine ;
57        }
58        event.send(buffList) ;
59        checkBuffers(ranks) ;
60      }
61
62      if (context->hasServer) waitEvent(ranks) ;
63      timeLine++ ;
64    }
65     
66    void CContextClient::waitEvent(list<int>& ranks)
67    {
68      context->server->setPendingEvent() ;
69      while(checkBuffers(ranks) || context->server->hasPendingEvent())
70      {
71        context->server->eventLoop() ;
72      }
73    }
74
75    list<CBufferOut*> CContextClient::getBuffers(list<int>& serverList, list<int>& sizeList)
76    {
77      list<int>::iterator itServer,itSize ;
78      list<CClientBuffer*> bufferList ; 
79      map<int,CClientBuffer*>::iterator it ; 
80      list<CClientBuffer*>::iterator itBuffer ; 
81      list<CBufferOut*>  retBuffer ;
82      bool free ;
83
84      for(itServer=serverList.begin();itServer!=serverList.end();itServer++) 
85      {
86        it=buffers.find(*itServer) ;
87        if (it==buffers.end()) 
88        {
89          newBuffer(*itServer) ;
90          it=buffers.find(*itServer) ;
91        }         
92        bufferList.push_back(it->second) ;
93      }
94      free=false ;
95
96      CTimer::get("Blocking time").resume();
97      while(!free)
98      {
99        free=true ;
100        for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
101        {
102          (*itBuffer)->checkBuffer() ;
103         free&=(*itBuffer)->isBufferFree(*itSize) ;
104        }
105      }
106      CTimer::get("Blocking time").suspend();
107
108      for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++)
109      {
110        retBuffer.push_back((*itBuffer)->getBuffer(*itSize)) ;
111      }
112      return retBuffer ;             
113   
114   }
115     
116   void CContextClient::newBuffer(int rank)
117   {
118      buffers[rank]=new CClientBuffer(interComm,rank) ;
119   } 
120
121   bool CContextClient::checkBuffers(void)
122   {
123      map<int,CClientBuffer*>::iterator itBuff ;
124      bool pending=false ;
125      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) pending|=itBuff->second->checkBuffer() ;
126      return pending ;
127   } 
128
129   void CContextClient::releaseBuffers(void)
130   {
131      map<int,CClientBuffer*>::iterator itBuff ;
132      for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) delete itBuff->second ;
133   } 
134
135   bool CContextClient::checkBuffers(list<int>& ranks)
136   {
137      list<int>::iterator it ;
138      bool pending=false ;
139      for(it=ranks.begin();it!=ranks.end();it++) pending|=buffers[*it]->checkBuffer() ;
140      return pending ;
141   } 
142
143   int CContextClient::getServerLeader(void)
144   {
145     int clientByServer=clientSize/serverSize ;
146     int remain=clientSize%serverSize ;
147     
148     if (clientRank<(clientByServer+1)*remain)
149     {
150       return clientRank/(clientByServer+1) ;
151     }
152     else
153     {
154       int rank=clientRank-(clientByServer+1)*remain ;
155       int nbServer=serverSize-remain ;
156       return remain+rank/clientByServer ;
157     }
158   }     
159
160   bool CContextClient::isServerLeader(void)
161   {
162     int clientByServer=clientSize/serverSize ;
163     int remain=clientSize%serverSize ;
164     
165     if (clientRank<(clientByServer+1)*remain)
166     {
167       if (clientRank%(clientByServer+1)==0) return true ;
168       else return false ;
169     }
170     else
171     {
172       int rank=clientRank-(clientByServer+1)*remain ;
173       int nbServer=serverSize-remain ;
174       if  (rank%clientByServer==0) return true ;
175       else return false ;
176     } 
177   }
178     
179   void CContextClient::finalize(void)
180   {
181     
182     map<int,CClientBuffer*>::iterator itBuff ;
183     bool stop=true ;
184
185     CEventClient event(CContext::GetType(),CContext::EVENT_ID_CONTEXT_FINALIZE) ;   
186     if (isServerLeader())
187     {
188       CMessage msg ;
189       event.push(getServerLeader(),1,msg) ;
190     }
191     sendEvent(event) ;
192 
193     CTimer::get("Blocking time").resume();
194     while(stop)
195     {
196       checkBuffers() ;
197       stop=false ;
198       for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) stop|=itBuff->second->hasPendingRequest() ;
199     }
200     CTimer::get("Blocking time").suspend();
201     
202     releaseBuffers() ;
203   }
204}     
Note: See TracBrowser for help on using the repository browser.