source: vendor/nemo/current/NEMOGCM/EXTERNAL/XIOS/src/context_server.cpp @ 44

Last change on this file since 44 was 44, checked in by cholod, 12 years ago

Load NEMO_TMP into vendor/nemo/current.

File size: 5.7 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include <mpi.h>
13#include "tracer.hpp"
14#include "timer.hpp"
15
16
17
18namespace xios
19{
20
21  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
22  {
23    context=parent ;
24    intraComm=intraComm_ ;
25    MPI_Comm_size(intraComm,&intraCommSize) ;
26    MPI_Comm_rank(intraComm,&intraCommRank) ;
27    interComm=interComm_ ;
28    int flag ;
29    MPI_Comm_test_inter(interComm,&flag) ;
30    if (flag) MPI_Comm_remote_size(interComm,&commSize);
31    else  MPI_Comm_size(interComm,&commSize) ;
32    currentTimeLine=0 ;
33    finished=false ;
34  }
35  void CContextServer::setPendingEvent(void)
36  {
37    pendingEvent=true ;
38  }
39 
40  bool CContextServer::hasPendingEvent(void)
41  {
42    return pendingEvent ;
43  }
44 
45  bool CContextServer::eventLoop(void)
46  {
47    listen() ;
48    checkPendingRequest() ;
49    processEvents() ;
50    return finished ;
51  }
52
53  void CContextServer::listen(void)
54  {
55    int rank;
56    int flag ;
57    int count ;
58    char * addr ;
59    MPI_Status status; 
60    map<int,CServerBuffer*>::iterator it;
61   
62    for(rank=0;rank<commSize;rank++)
63    {
64      if (pendingRequest.find(rank)==pendingRequest.end())
65      {
66        traceOff() ;
67        MPI_Iprobe(rank,20,interComm,&flag,&status);     
68        traceOn() ;
69        if (flag==true)
70        {
71          it=buffers.find(rank) ;
72          if (it==buffers.end()) 
73            it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer))).first ;
74          MPI_Get_count(&status,MPI_CHAR,&count) ;
75          if (it->second->isBufferFree(count))
76          {
77            addr=(char*)it->second->getBuffer(count) ;
78            MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]) ;
79            bufferRequest[rank]=addr ;
80          }
81        }
82      }
83    }
84  }
85 
86  void CContextServer::checkPendingRequest(void)
87  {
88    map<int,MPI_Request>::iterator it;
89    list<int> recvRequest ;
90    list<int>::iterator itRecv;
91    int rank ;
92    int flag ;
93    int count ;
94    MPI_Status status ;
95   
96    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
97    {
98      rank=it->first ;
99      traceOff() ;
100      MPI_Test(& it->second, &flag, &status) ;
101      traceOn() ;
102      if (flag==true)
103      {
104        recvRequest.push_back(rank) ;
105        MPI_Get_count(&status,MPI_CHAR,&count) ;
106        processRequest(rank,bufferRequest[rank],count) ;
107      }
108    }
109   
110    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++) 
111    {
112      pendingRequest.erase(*itRecv) ;
113      bufferRequest.erase(*itRecv) ;
114    }
115  }
116 
117  void CContextServer::processRequest(int rank, char* buff,int count)
118  {
119   
120    CBufferIn buffer(buff,count) ;
121    char* startBuffer,endBuffer ;
122    int size, offset ;
123    size_t timeLine ;
124    map<size_t,CEventServer*>::iterator it ;
125       
126    while(count>0)
127    {
128      char* startBuffer=(char*)buffer.ptr() ;
129      CBufferIn newBuffer(startBuffer,buffer.remain()) ;
130      newBuffer>>size>>timeLine ;
131
132      it=events.find(timeLine) ;
133      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first ;
134      it->second->push(rank,buffers[rank],startBuffer,size) ;
135
136      buffer.advance(size) ;
137      count=buffer.remain() ;           
138    } 
139 
140  }
141   
142  void CContextServer::processEvents(void)
143  {
144    map<size_t,CEventServer*>::iterator it ;
145    CEventServer* event ;
146   
147    it=events.find(currentTimeLine) ;
148    if (it!=events.end()) 
149    {
150      event=it->second ;
151      if (event->isFull())
152      {
153         CTimer::get("Process events").resume() ;
154         dispatchEvent(*event) ;
155         CTimer::get("Process events").suspend() ;
156         pendingEvent=false ;
157         delete event ;
158         events.erase(it) ;
159         currentTimeLine++ ;
160       }
161     }
162   }
163       
164  CContextServer::~CContextServer()
165  {
166    map<int,CServerBuffer*>::iterator it ;
167    for(it=buffers.begin();it!=buffers.end();++it) delete it->second ; 
168  } 
169
170
171  void CContextServer::dispatchEvent(CEventServer& event)
172  {
173    string contextName ;
174    string buff ;
175    int MsgSize ;
176    int rank ;
177    list<CEventServer::SSubEvent>::iterator it ;
178    CContext::setCurrent(context->getId()) ;
179       
180    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
181    {
182      info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl ;
183      context->finalize() ;
184      finished=true ;
185    }
186    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event) ;
187    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event) ;
188    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event) ;
189    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event) ;
190    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event) ;
191    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event) ;
192    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event) ;
193    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event) ;
194    else if (event.classId==CField::GetType()) CField::dispatchEvent(event) ;
195    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event) ;
196    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event) ;
197    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event) ;
198    else
199    {
200      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl) ;
201    }
202  }
203}
Note: See TracBrowser for help on using the repository browser.