source: XIOS/trunk/src/context_server.cpp @ 489

Last change on this file since 489 was 489, checked in by mhnguyen, 10 years ago

Ticket 50: Implementing the getting/setting methods for Fortran interface

+) Add some C and Fortran functions to set and get data to/from CVariable with an id
+) Add method to send, receive and dispatch in CVariable
+) Add dispatch method in server class

Test
-) On Curie
-) Test data: integer, float, double, boolean, string
-) File: one and multiple, using_server: ON and OFF
+) All test cases passed and had correct results

  • Property svn:eol-style set to native
File size: 5.8 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16
17
18
19namespace xios
20{
21
22  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
23  {
24    context=parent ;
25    intraComm=intraComm_ ;
26    MPI_Comm_size(intraComm,&intraCommSize) ;
27    MPI_Comm_rank(intraComm,&intraCommRank) ;
28    interComm=interComm_ ;
29    int flag ;
30    MPI_Comm_test_inter(interComm,&flag) ;
31    if (flag) MPI_Comm_remote_size(interComm,&commSize);
32    else  MPI_Comm_size(interComm,&commSize) ;
33    currentTimeLine=0 ;
34    finished=false ;
35  }
36  void CContextServer::setPendingEvent(void)
37  {
38    pendingEvent=true ;
39  }
40
41  bool CContextServer::hasPendingEvent(void)
42  {
43    return pendingEvent ;
44  }
45
46  bool CContextServer::eventLoop(void)
47  {
48    listen() ;
49    checkPendingRequest() ;
50    processEvents() ;
51    return finished ;
52  }
53
54  void CContextServer::listen(void)
55  {
56    int rank;
57    int flag ;
58    int count ;
59    char * addr ;
60    MPI_Status status;
61    map<int,CServerBuffer*>::iterator it;
62
63    for(rank=0;rank<commSize;rank++)
64    {
65      if (pendingRequest.find(rank)==pendingRequest.end())
66      {
67        traceOff() ;
68        MPI_Iprobe(rank,20,interComm,&flag,&status);
69        traceOn() ;
70        if (flag==true)
71        {
72          it=buffers.find(rank) ;
73          if (it==buffers.end())
74            it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer))).first ;
75          MPI_Get_count(&status,MPI_CHAR,&count) ;
76          if (it->second->isBufferFree(count))
77          {
78            addr=(char*)it->second->getBuffer(count) ;
79            MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]) ;
80            bufferRequest[rank]=addr ;
81          }
82        }
83      }
84    }
85  }
86
87  void CContextServer::checkPendingRequest(void)
88  {
89    map<int,MPI_Request>::iterator it;
90    list<int> recvRequest ;
91    list<int>::iterator itRecv;
92    int rank ;
93    int flag ;
94    int count ;
95    MPI_Status status ;
96
97    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
98    {
99      rank=it->first ;
100      traceOff() ;
101      MPI_Test(& it->second, &flag, &status) ;
102      traceOn() ;
103      if (flag==true)
104      {
105        recvRequest.push_back(rank) ;
106        MPI_Get_count(&status,MPI_CHAR,&count) ;
107        processRequest(rank,bufferRequest[rank],count) ;
108      }
109    }
110
111    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
112    {
113      pendingRequest.erase(*itRecv) ;
114      bufferRequest.erase(*itRecv) ;
115    }
116  }
117
118  void CContextServer::processRequest(int rank, char* buff,int count)
119  {
120
121    CBufferIn buffer(buff,count) ;
122    char* startBuffer,endBuffer ;
123    int size, offset ;
124    size_t timeLine ;
125    map<size_t,CEventServer*>::iterator it ;
126
127    while(count>0)
128    {
129      char* startBuffer=(char*)buffer.ptr() ;
130      CBufferIn newBuffer(startBuffer,buffer.remain()) ;
131      newBuffer>>size>>timeLine ;
132
133      it=events.find(timeLine) ;
134      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first ;
135      it->second->push(rank,buffers[rank],startBuffer,size) ;
136
137      buffer.advance(size) ;
138      count=buffer.remain() ;
139    }
140
141  }
142
143  void CContextServer::processEvents(void)
144  {
145    map<size_t,CEventServer*>::iterator it ;
146    CEventServer* event ;
147
148    it=events.find(currentTimeLine) ;
149    if (it!=events.end())
150    {
151      event=it->second ;
152      if (event->isFull())
153      {
154         CTimer::get("Process events").resume() ;
155         dispatchEvent(*event) ;
156         CTimer::get("Process events").suspend() ;
157         pendingEvent=false ;
158         delete event ;
159         events.erase(it) ;
160         currentTimeLine++ ;
161       }
162     }
163   }
164
165  CContextServer::~CContextServer()
166  {
167    map<int,CServerBuffer*>::iterator it ;
168    for(it=buffers.begin();it!=buffers.end();++it) delete it->second ;
169  }
170
171
172  void CContextServer::dispatchEvent(CEventServer& event)
173  {
174    string contextName ;
175    string buff ;
176    int MsgSize ;
177    int rank ;
178    list<CEventServer::SSubEvent>::iterator it ;
179    CContext::setCurrent(context->getId()) ;
180
181    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
182    {
183      info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl ;
184      context->finalize() ;
185      finished=true ;
186      report(0)<< " Memory report : Context <"<<context->getId()<<"> : server side : total memory used for buffer "<<buffers.size()*CXios::bufferSize<<" bytes"<<endl ;
187    }
188    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event) ;
189    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event) ;
190    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event) ;
191    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event) ;
192    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event) ;
193    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event) ;
194    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event) ;
195    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event) ;
196    else if (event.classId==CField::GetType()) CField::dispatchEvent(event) ;
197    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event) ;
198    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event) ;
199    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event) ;
200    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event) ;
201    else
202    {
203      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl) ;
204    }
205  }
206}
Note: See TracBrowser for help on using the repository browser.