source: XIOS/trunk/src/context_server.cpp @ 321

Last change on this file since 321 was 314, checked in by ymipsl, 12 years ago

Removing obsolete files and some cleaning...

YM

  • Property svn:eol-style set to native
File size: 5.4 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template_impl.hpp"
6#include "group_template_impl.hpp"
7#include "tree_manager.hpp"
8#include "domain.hpp"
9
10#include <mpi.h>
11
12
13
14namespace xmlioserver
15{
16
17  CContextServer::CContextServer(tree::CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
18  {
19    context=parent ;
20    intraComm=intraComm_ ;
21    MPI_Comm_size(intraComm,&intraCommSize) ;
22    MPI_Comm_rank(intraComm,&intraCommRank) ;
23    interComm=interComm_ ;
24    int flag ;
25    MPI_Comm_test_inter(interComm,&flag) ;
26    if (flag) MPI_Comm_remote_size(interComm,&commSize);
27    else  MPI_Comm_size(interComm,&commSize) ;
28    currentTimeLine=0 ;
29    finished=false ;
30  }
31  void CContextServer::setPendingEvent(void)
32  {
33    pendingEvent=true ;
34  }
35 
36  bool CContextServer::hasPendingEvent(void)
37  {
38    return pendingEvent ;
39  }
40 
41  bool CContextServer::eventLoop(void)
42  {
43    listen() ;
44    checkPendingRequest() ;
45    processEvents() ;
46    return finished ;
47  }
48
49  void CContextServer::listen(void)
50  {
51    int rank;
52    int flag ;
53    int count ;
54    char * addr ;
55    MPI_Status status; 
56    map<int,CServerBuffer*>::iterator it;
57   
58    for(rank=0;rank<commSize;rank++)
59    {
60      if (pendingRequest.find(rank)==pendingRequest.end())
61      {
62        MPI_Iprobe(rank,20,interComm,&flag,&status);     
63        if (flag==true)
64        {
65          it=buffers.find(rank) ;
66          if (it==buffers.end()) 
67            it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer))).first ;
68          MPI_Get_count(&status,MPI_CHAR,&count) ;
69          if (it->second->isBufferFree(count))
70          {
71            addr=(char*)it->second->getBuffer(count) ;
72            MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]) ;
73            bufferRequest[rank]=addr ;
74          }
75        }
76      }
77    }
78  }
79 
80  void CContextServer::checkPendingRequest(void)
81  {
82    map<int,MPI_Request>::iterator it;
83    list<int> recvRequest ;
84    list<int>::iterator itRecv;
85    int rank ;
86    int flag ;
87    int count ;
88    MPI_Status status ;
89   
90    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
91    {
92      rank=it->first ;
93      MPI_Test(& it->second, &flag, &status) ;
94      if (flag==true)
95      {
96        recvRequest.push_back(rank) ;
97        MPI_Get_count(&status,MPI_CHAR,&count) ;
98        processRequest(rank,bufferRequest[rank],count) ;
99      }
100    }
101   
102    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++) 
103    {
104      pendingRequest.erase(*itRecv) ;
105      bufferRequest.erase(*itRecv) ;
106    }
107  }
108 
109  void CContextServer::processRequest(int rank, char* buff,int count)
110  {
111   
112    CBufferIn buffer(buff,count) ;
113    char* startBuffer,endBuffer ;
114    int size, offset ;
115    size_t timeLine ;
116    map<size_t,CEventServer*>::iterator it ;
117       
118    while(count>0)
119    {
120      char* startBuffer=(char*)buffer.ptr() ;
121      CBufferIn newBuffer(startBuffer,buffer.remain()) ;
122      newBuffer>>size>>timeLine ;
123
124      it=events.find(timeLine) ;
125      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first ;
126      it->second->push(rank,buffers[rank],startBuffer,size) ;
127
128      buffer.advance(size) ;
129      count=buffer.remain() ;           
130    } 
131 
132  }
133   
134  void CContextServer::processEvents(void)
135  {
136    map<size_t,CEventServer*>::iterator it ;
137    CEventServer* event ;
138   
139    it=events.find(currentTimeLine) ;
140    if (it!=events.end()) 
141    {
142      event=it->second ;
143      if (event->isFull())
144      {
145         dispatchEvent(*event) ;
146         pendingEvent=false ;
147         delete event ;
148         events.erase(it) ;
149         currentTimeLine++ ;
150       }
151     }
152   }
153       
154  CContextServer::~CContextServer()
155  {
156    map<int,CServerBuffer*>::iterator it ;
157    for(it=buffers.begin();it!=buffers.end();++it) delete it->second ; 
158  } 
159
160
161  void CContextServer::dispatchEvent(CEventServer& event)
162  {
163    string contextName ;
164    string buff ;
165    int MsgSize ;
166    int rank ;
167    list<CEventServer::SSubEvent>::iterator it ;
168    tree::CTreeManager::SetCurrentContextId(context->getId()) ;
169       
170    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
171    {
172      info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl ;
173      context->finalize() ;
174      finished=true ;
175    }
176    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event) ;
177    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event) ;
178    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event) ;
179    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event) ;
180    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event) ;
181    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event) ;
182    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event) ;
183    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event) ;
184    else if (event.classId==CField::GetType()) CField::dispatchEvent(event) ;
185    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event) ;
186    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event) ;
187    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event) ;
188    else
189    {
190      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl) ;
191    }
192  }
193}
Note: See TracBrowser for help on using the repository browser.