1 | #include "context_server.hpp" |
2 | #include "buffer_in.hpp" |
3 | #include "type.hpp" |
4 | #include "context.hpp" |
5 | #include "object_template.hpp" |
6 | #include "group_template.hpp" |
7 | #include "attribute_template.hpp" |
8 | #include "domain.hpp" |
9 | #include "field.hpp" |
10 | #include "file.hpp" |
11 | #include "grid.hpp" |
12 | #include "mpi_std.hpp" |
13 | #include "tracer.hpp" |
14 | #include "timer.hpp" |
15 | #include "cxios.hpp" |
16 | #include "event_scheduler.hpp" |
17 | #include "server.hpp" |
18 | #include <boost/functional/hash.hpp> |
19 | |
20 | |
21 | |
22 | namespace xios |
23 | { |
24 | |
25 | CContextServer::CContextServer(CContext* parent, ep_lib::MPI_Comm intraComm_, ep_lib::MPI_Comm interComm_) |
26 | { |
27 | context=parent; |
28 | intraComm=intraComm_; |
29 | MPI_Comm_size(intraComm,&intraCommSize); |
30 | MPI_Comm_rank(intraComm,&intraCommRank); |
31 | interComm=interComm_; |
32 | int flag; |
33 | MPI_Comm_test_inter(interComm,&flag); |
34 | if (flag) MPI_Comm_remote_size(interComm,&commSize); |
35 | else MPI_Comm_size(interComm,&commSize); |
36 | currentTimeLine=0; |
37 | scheduled=false; |
38 | finished=false; |
39 | |
40 | boost::hash<string> hashString; |
41 | hashId=hashString(context->getId()); |
42 | |
43 | } |
44 | void CContextServer::setPendingEvent(void) |
45 | { |
46 | pendingEvent=true; |
47 | } |
48 | |
49 | bool CContextServer::hasPendingEvent(void) |
50 | { |
51 | return pendingEvent; |
52 | } |
53 | |
54 | bool CContextServer::hasFinished(void) |
55 | { |
56 | return finished; |
57 | } |
58 | |
59 | bool CContextServer::eventLoop(void) |
60 | { |
61 | listen(); |
62 | checkPendingRequest(); |
63 | processEvents(); |
64 | return finished; |
65 | } |
66 | |
67 | void CContextServer::listen(void) |
68 | { |
69 | int rank; |
70 | int flag; |
71 | int count; |
72 | char * addr; |
73 | ep_lib::MPI_Status status; |
74 | map<int,CServerBuffer*>::iterator it; |
75 | |
76 | for(rank=0;rank<commSize;rank++) |
77 | { |
78 | //printf("in CContextServer::listen, rank = %d, commSize = %d, pendingRequest.find(rank) = %d\n", rank, commSize, pendingRequest.find(rank)); |
79 | if (pendingRequest.find(rank)==pendingRequest.end()) |
80 | { |
81 | traceOff(); |
82 | MPI_Iprobe(rank,20,interComm,&flag,&status); |
83 | traceOn(); |
84 | if (flag==true) |
85 | { |
86 | it=buffers.find(rank); |
87 | |
88 | if (it==buffers.end()) // Receive the buffer size and allocate the buffer |
89 | { |
90 | StdSize buffSize = 0; |
91 | MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); |
92 | mapBufferSize_.insert(std::make_pair(rank, buffSize)); |
93 | it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; |
94 | //printf("find message, is buffer end, receiving, buffSize = %d, rank = %d, commSize = %d\n", buffSize, rank, commSize); |
95 | } |
96 | else |
97 | { |
98 | |
99 | MPI_Get_count(&status,MPI_CHAR,&count); |
100 | if (it->second->isBufferFree(count)) |
101 | { |
102 | addr=(char*)it->second->getBuffer(count); |
103 | ep_lib::MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]); |
104 | bufferRequest[rank]=addr; |
105 | //printf("find message, i-receiving to buffer %p, rank = %d, commSize = %d\n", addr, rank, commSize); |
106 | } |
107 | } |
108 | } |
109 | } |
110 | } |
111 | } |
112 | |
113 | void CContextServer::checkPendingRequest(void) |
114 | { |
115 | map<int,ep_lib::MPI_Request>::iterator it; |
116 | list<int> recvRequest; |
117 | list<int>::iterator itRecv; |
118 | int rank; |
119 | int flag; |
120 | int count; |
121 | ep_lib::MPI_Status status; |
122 | |
123 | //printf("enter checkPendingRequest\n"); |
124 | if(!pendingRequest.empty()) |
125 | for(it=pendingRequest.begin();it!=pendingRequest.end();++it) |
126 | { |
127 | rank=it->first; |
128 | traceOff(); |
129 | MPI_Test(& it->second, &flag, &status); |
130 | traceOn(); |
131 | if (flag==true) |
132 | { |
133 | recvRequest.push_back(rank); |
134 | MPI_Get_count(&status,MPI_CHAR,&count); |
135 | processRequest(rank,bufferRequest[rank],count); |
136 | } |
137 | } |
138 | |
139 | if(!recvRequest.empty()) |
140 | for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++) |
141 | { |
142 | pendingRequest.erase(*itRecv); |
143 | bufferRequest.erase(*itRecv); |
144 | } |
145 | } |
146 | |
147 | void CContextServer::processRequest(int rank, char* buff,int count) |
148 | { |
149 | |
150 | CBufferIn buffer(buff,count); |
151 | char* startBuffer,endBuffer; |
152 | int size, offset; |
153 | size_t timeLine; |
154 | map<size_t,CEventServer*>::iterator it; |
155 | |
156 | while(count>0) |
157 | { |
158 | char* startBuffer=(char*)buffer.ptr(); |
159 | CBufferIn newBuffer(startBuffer,buffer.remain()); |
160 | newBuffer>>size>>timeLine; |
161 | |
162 | it=events.find(timeLine); |
163 | if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first; |
164 | it->second->push(rank,buffers[rank],startBuffer,size); |
165 | |
166 | buffer.advance(size); |
167 | count=buffer.remain(); |
168 | } |
169 | |
170 | } |
171 | |
172 | void CContextServer::processEvents(void) |
173 | { |
174 | map<size_t,CEventServer*>::iterator it; |
175 | CEventServer* event; |
176 | |
177 | it=events.find(currentTimeLine); |
178 | if (it!=events.end()) |
179 | { |
180 | event=it->second; |
181 | |
182 | if (event->isFull()) |
183 | { |
184 | if (!scheduled && CServer::eventScheduler) // Skip event scheduling for attached mode and reception on client side |
185 | { |
186 | CServer::eventScheduler->registerEvent(currentTimeLine,hashId); |
187 | scheduled=true; |
188 | } |
189 | else if (!CServer::eventScheduler || CServer::eventScheduler->queryEvent(currentTimeLine,hashId) ) |
190 | { |
191 | // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes |
192 | // The best way to properly solve this problem will be to use the event scheduler also in attached mode |
193 | // for now just set up a MPI barrier |
194 | if (!CServer::eventScheduler) MPI_Barrier(intraComm) ; |
195 | |
196 | CTimer::get("Process events").resume(); |
197 | dispatchEvent(*event); |
198 | CTimer::get("Process events").suspend(); |
199 | pendingEvent=false; |
200 | delete event; |
201 | events.erase(it); |
202 | currentTimeLine++; |
203 | scheduled = false; |
204 | } |
205 | } |
206 | } |
207 | } |
208 | |
209 | CContextServer::~CContextServer() |
210 | { |
211 | map<int,CServerBuffer*>::iterator it; |
212 | for(it=buffers.begin();it!=buffers.end();++it) delete it->second; |
213 | } |
214 | |
215 | |
216 | void CContextServer::dispatchEvent(CEventServer& event) |
217 | { |
218 | string contextName; |
219 | string buff; |
220 | int MsgSize; |
221 | int rank; |
222 | list<CEventServer::SSubEvent>::iterator it; |
223 | CContext::setCurrent(context->getId()); |
224 | |
225 | if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE) |
226 | { |
227 | finished=true; |
228 | info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl; |
229 | std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), |
230 | iteMap = mapBufferSize_.end(), itMap; |
231 | StdSize totalBuf = 0; |
232 | for (itMap = itbMap; itMap != iteMap; ++itMap) |
233 | { |
234 | report(10)<< " Memory report : Context <"<<context->getId()<<"> : server side : memory used for buffer of each connection to client" << endl |
235 | << " +) With client of rank " << itMap->first << " : " << itMap->second << " bytes " << endl; |
236 | totalBuf += itMap->second; |
237 | } |
238 | context->finalize(); |
239 | report(0)<< " Memory report : Context <"<<context->getId()<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl; |
240 | } |
241 | else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event); |
242 | else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event); |
243 | else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event); |
244 | else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event); |
245 | else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event); |
246 | else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event); |
247 | else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event); |
248 | else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event); |
249 | else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event); |
250 | else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event); |
251 | else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event); |
252 | else if (event.classId==CField::GetType()) CField::dispatchEvent(event); |
253 | else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event); |
254 | else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event); |
255 | else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event); |
256 | else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event); |
257 | else |
258 | { |
259 | ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl); |
260 | } |
261 | } |
262 | } |