source: XMLIO_V2/dev/common/src/context_server.cpp @ 300

Last change on this file since 300 was 300, checked in by ymipsl, 12 years ago

nouvelle version de developpement de xios

  • nouvelle interface fortran
  • recodage complet de la couche de communication
  • et bien d'autres choses...

YM

  • Property svn:eol-style set to native
File size: 5.5 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "transfert_parameters.hpp"
5#include "context.hpp"
6#include "object_template_impl.hpp"
7#include "group_template_impl.hpp"
8#include "tree_manager.hpp"
9#include "domain.hpp"
10
11#include <mpi.h>
12
13
14
15namespace xmlioserver
16{
17
18  CContextServer::CContextServer(tree::CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
19  {
20    context=parent ;
21    intraComm=intraComm_ ;
22    MPI_Comm_size(intraComm,&intraCommSize) ;
23    MPI_Comm_rank(intraComm,&intraCommRank) ;
24    interComm=interComm_ ;
25    int flag ;
26    MPI_Comm_test_inter(interComm,&flag) ;
27    if (flag) MPI_Comm_remote_size(interComm,&commSize);
28    else  MPI_Comm_size(interComm,&commSize) ;
29    currentTimeLine=0 ;
30    finished=false ;
31  }
32  void CContextServer::setPendingEvent(void)
33  {
34    pendingEvent=true ;
35  }
36 
37  bool CContextServer::hasPendingEvent(void)
38  {
39    return pendingEvent ;
40  }
41 
42  bool CContextServer::eventLoop(void)
43  {
44    listen() ;
45    checkPendingRequest() ;
46    processEvents() ;
47    return finished ;
48  }
49
50  void CContextServer::listen(void)
51  {
52    int rank;
53    int flag ;
54    int count ;
55    char * addr ;
56    MPI_Status status; 
57    map<int,CServerBuffer*>::iterator it;
58   
59    for(rank=0;rank<commSize;rank++)
60    {
61      if (pendingRequest.find(rank)==pendingRequest.end())
62      {
63        MPI_Iprobe(rank,20,interComm,&flag,&status);     
64        if (flag==true)
65        {
66          it=buffers.find(rank) ;
67          if (it==buffers.end()) 
68            it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer))).first ;
69          MPI_Get_count(&status,MPI_CHAR,&count) ;
70          if (it->second->isBufferFree(count))
71          {
72            addr=(char*)it->second->getBuffer(count) ;
73            MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]) ;
74            bufferRequest[rank]=addr ;
75          }
76        }
77      }
78    }
79  }
80 
81  void CContextServer::checkPendingRequest(void)
82  {
83    map<int,MPI_Request>::iterator it;
84    list<int> recvRequest ;
85    list<int>::iterator itRecv;
86    int rank ;
87    int flag ;
88    int count ;
89    MPI_Status status ;
90   
91    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
92    {
93      rank=it->first ;
94      MPI_Test(& it->second, &flag, &status) ;
95      if (flag==true)
96      {
97        recvRequest.push_back(rank) ;
98        MPI_Get_count(&status,MPI_CHAR,&count) ;
99        processRequest(rank,bufferRequest[rank],count) ;
100      }
101    }
102   
103    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++) 
104    {
105      pendingRequest.erase(*itRecv) ;
106      bufferRequest.erase(*itRecv) ;
107    }
108  }
109 
110  void CContextServer::processRequest(int rank, char* buff,int count)
111  {
112   
113    CBufferIn buffer(buff,count) ;
114    char* startBuffer,endBuffer ;
115    int size, offset ;
116    size_t timeLine ;
117    map<size_t,CEventServer*>::iterator it ;
118       
119    while(count>0)
120    {
121      char* startBuffer=(char*)buffer.ptr() ;
122      CBufferIn newBuffer(startBuffer,buffer.remain()) ;
123      newBuffer>>size>>timeLine ;
124
125      it=events.find(timeLine) ;
126      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first ;
127      it->second->push(rank,buffers[rank],startBuffer,size) ;
128
129      buffer.advance(size) ;
130      count=buffer.remain() ;           
131    } 
132 
133  }
134   
135  void CContextServer::processEvents(void)
136  {
137    map<size_t,CEventServer*>::iterator it ;
138    CEventServer* event ;
139   
140    it=events.find(currentTimeLine) ;
141    if (it!=events.end()) 
142    {
143      event=it->second ;
144      if (event->isFull())
145      {
146         dispatchEvent(*event) ;
147         pendingEvent=false ;
148         delete event ;
149         events.erase(it) ;
150         currentTimeLine++ ;
151       }
152     }
153   }
154       
155  CContextServer::~CContextServer()
156  {
157    map<int,CServerBuffer*>::iterator it ;
158    for(it=buffers.begin();it!=buffers.end();++it) delete it->second ; 
159  } 
160
161
162  void CContextServer::dispatchEvent(CEventServer& event)
163  {
164    string contextName ;
165    string buff ;
166    int MsgSize ;
167    int rank ;
168    list<CEventServer::SSubEvent>::iterator it ;
169    tree::CTreeManager::SetCurrentContextId(context->getId()) ;
170       
171    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
172    {
173      info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl ;
174      context->finalize() ;
175      finished=true ;
176    }
177    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event) ;
178    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event) ;
179    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event) ;
180    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event) ;
181    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event) ;
182    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event) ;
183    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event) ;
184    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event) ;
185    else if (event.classId==CField::GetType()) CField::dispatchEvent(event) ;
186    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event) ;
187    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event) ;
188    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event) ;
189    else
190    {
191      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl) ;
192    }
193  }
194}
Note: See TracBrowser for help on using the repository browser.