Changeset 549 for XIOS/trunk/src/context_server.cpp
- Timestamp:
- 01/26/15 14:39:26 (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/trunk/src/context_server.cpp
r511 r549 25 25 CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 26 26 { 27 context=parent 28 intraComm=intraComm_ 29 MPI_Comm_size(intraComm,&intraCommSize) 30 MPI_Comm_rank(intraComm,&intraCommRank) 31 interComm=interComm_ 32 int flag 33 MPI_Comm_test_inter(interComm,&flag) 27 context=parent; 28 intraComm=intraComm_; 29 MPI_Comm_size(intraComm,&intraCommSize); 30 MPI_Comm_rank(intraComm,&intraCommRank); 31 interComm=interComm_; 32 int flag; 33 MPI_Comm_test_inter(interComm,&flag); 34 34 if (flag) MPI_Comm_remote_size(interComm,&commSize); 35 else MPI_Comm_size(interComm,&commSize) 36 currentTimeLine=0 37 scheduled=false 38 finished=false 39 40 boost::hash<string> hashString 41 hashId=hashString(context->getId()) 35 else MPI_Comm_size(interComm,&commSize); 36 currentTimeLine=0; 37 scheduled=false; 38 finished=false; 39 40 boost::hash<string> hashString; 41 hashId=hashString(context->getId()); 42 42 43 43 } 44 44 void CContextServer::setPendingEvent(void) 45 45 { 46 pendingEvent=true 46 pendingEvent=true; 47 47 } 48 48 49 49 bool CContextServer::hasPendingEvent(void) 50 50 { 51 return pendingEvent 51 return pendingEvent; 52 52 } 53 53 54 54 bool CContextServer::eventLoop(void) 55 55 { 56 listen() 57 checkPendingRequest() 58 processEvents() 59 return finished 56 listen(); 57 checkPendingRequest(); 58 processEvents(); 59 return finished; 60 60 } 61 61 … … 63 63 { 64 64 int rank; 65 int flag 66 int count 67 char * addr 65 int flag; 66 int count; 67 char * addr; 68 68 MPI_Status status; 69 69 map<int,CServerBuffer*>::iterator it; … … 73 73 if (pendingRequest.find(rank)==pendingRequest.end()) 74 74 { 75 traceOff() 75 traceOff(); 76 76 MPI_Iprobe(rank,20,interComm,&flag,&status); 77 traceOn() 77 traceOn(); 78 78 if (flag==true) 79 79 { 80 it=buffers.find(rank) 80 it=buffers.find(rank); 81 81 if (it==buffers.end()) 82 82 { … … 84 84 MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); 85 85 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 86 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first 86 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 87 87 } 88 88 else 89 89 { 90 MPI_Get_count(&status,MPI_CHAR,&count) 90 MPI_Get_count(&status,MPI_CHAR,&count); 91 91 if (it->second->isBufferFree(count)) 92 92 { 93 addr=(char*)it->second->getBuffer(count) 94 MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]) 95 bufferRequest[rank]=addr 93 addr=(char*)it->second->getBuffer(count); 94 MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]); 95 bufferRequest[rank]=addr; 96 96 } 97 97 } … … 104 104 { 105 105 map<int,MPI_Request>::iterator it; 106 list<int> recvRequest 106 list<int> recvRequest; 107 107 list<int>::iterator itRecv; 108 int rank 109 int flag 110 int count 111 MPI_Status status 108 int rank; 109 int flag; 110 int count; 111 MPI_Status status; 112 112 113 113 for(it=pendingRequest.begin();it!=pendingRequest.end();it++) 114 114 { 115 rank=it->first 116 traceOff() 117 MPI_Test(& it->second, &flag, &status) 118 traceOn() 115 rank=it->first; 116 traceOff(); 117 MPI_Test(& it->second, &flag, &status); 118 traceOn(); 119 119 if (flag==true) 120 120 { 121 recvRequest.push_back(rank) 122 MPI_Get_count(&status,MPI_CHAR,&count) 123 processRequest(rank,bufferRequest[rank],count) 121 recvRequest.push_back(rank); 122 MPI_Get_count(&status,MPI_CHAR,&count); 123 processRequest(rank,bufferRequest[rank],count); 124 124 } 125 125 } … … 127 127 for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++) 128 128 { 129 pendingRequest.erase(*itRecv) 130 bufferRequest.erase(*itRecv) 129 pendingRequest.erase(*itRecv); 130 bufferRequest.erase(*itRecv); 131 131 } 132 132 } … … 135 135 { 136 136 137 CBufferIn buffer(buff,count) 138 char* startBuffer,endBuffer 139 int size, offset 140 size_t timeLine 141 map<size_t,CEventServer*>::iterator it 137 CBufferIn buffer(buff,count); 138 char* startBuffer,endBuffer; 139 int size, offset; 140 size_t timeLine; 141 map<size_t,CEventServer*>::iterator it; 142 142 143 143 while(count>0) 144 144 { 145 char* startBuffer=(char*)buffer.ptr() 146 CBufferIn newBuffer(startBuffer,buffer.remain()) 147 newBuffer>>size>>timeLine 148 149 it=events.find(timeLine) 150 if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first 151 it->second->push(rank,buffers[rank],startBuffer,size) 152 153 buffer.advance(size) 154 count=buffer.remain() 145 char* startBuffer=(char*)buffer.ptr(); 146 CBufferIn newBuffer(startBuffer,buffer.remain()); 147 newBuffer>>size>>timeLine; 148 149 it=events.find(timeLine); 150 if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first; 151 it->second->push(rank,buffers[rank],startBuffer,size); 152 153 buffer.advance(size); 154 count=buffer.remain(); 155 155 } 156 156 … … 159 159 void CContextServer::processEvents(void) 160 160 { 161 map<size_t,CEventServer*>::iterator it 162 CEventServer* event 163 164 it=events.find(currentTimeLine) 161 map<size_t,CEventServer*>::iterator it; 162 CEventServer* event; 163 164 it=events.find(currentTimeLine); 165 165 if (it!=events.end()) 166 166 { 167 event=it->second 167 event=it->second; 168 168 169 169 if (event->isFull()) … … 171 171 if (!scheduled && !CXios::isServer) 172 172 { 173 CServer::eventScheduler->registerEvent(currentTimeLine,hashId) 174 scheduled=true 173 CServer::eventScheduler->registerEvent(currentTimeLine,hashId); 174 scheduled=true; 175 175 } 176 176 else if (CXios::isServer || CServer::eventScheduler->queryEvent(currentTimeLine,hashId) ) 177 177 { 178 CTimer::get("Process events").resume() 179 dispatchEvent(*event) 180 CTimer::get("Process events").suspend() 181 pendingEvent=false 182 delete event 183 events.erase(it) 184 currentTimeLine++ 185 scheduled = false 178 CTimer::get("Process events").resume(); 179 dispatchEvent(*event); 180 CTimer::get("Process events").suspend(); 181 pendingEvent=false; 182 delete event; 183 events.erase(it); 184 currentTimeLine++; 185 scheduled = false; 186 186 } 187 187 } … … 191 191 CContextServer::~CContextServer() 192 192 { 193 map<int,CServerBuffer*>::iterator it 194 for(it=buffers.begin();it!=buffers.end();++it) delete it->second 193 map<int,CServerBuffer*>::iterator it; 194 for(it=buffers.begin();it!=buffers.end();++it) delete it->second; 195 195 } 196 196 … … 198 198 void CContextServer::dispatchEvent(CEventServer& event) 199 199 { 200 string contextName 201 string buff 202 int MsgSize 203 int rank 204 list<CEventServer::SSubEvent>::iterator it 205 CContext::setCurrent(context->getId()) 200 string contextName; 201 string buff; 202 int MsgSize; 203 int rank; 204 list<CEventServer::SSubEvent>::iterator it; 205 CContext::setCurrent(context->getId()); 206 206 207 207 if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE) 208 208 { 209 info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl 209 info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl; 210 210 std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 211 211 iteMap = mapBufferSize_.end(), itMap; … … 217 217 totalBuf += itMap->second; 218 218 } 219 context->finalize() ; 220 finished=true ; 221 report(0)<< " Memory report : Context <"<<context->getId()<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl ; 222 } 223 else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event) ; 224 else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event) ; 225 else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event) ; 226 else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event) ; 227 else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event) ; 228 else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event) ; 229 else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event) ; 230 else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event) ; 231 else if (event.classId==CField::GetType()) CField::dispatchEvent(event) ; 232 else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event) ; 233 else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event) ; 234 else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event) ; 235 else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event) ; 219 context->finalize(); 220 finished=true; 221 report(0)<< " Memory report : Context <"<<context->getId()<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl; 222 } 223 else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event); 224 else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event); 225 else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event); 226 else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event); 227 else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event); 228 else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event); 229 else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event); 230 else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event); 231 else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event); 232 else if (event.classId==CField::GetType()) CField::dispatchEvent(event); 233 else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event); 234 else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event); 235 else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event); 236 else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event); 236 237 else 237 238 { 238 ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl) 239 ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl); 239 240 } 240 241 }
Note: See TracChangeset
for help on using the changeset viewer.