Changeset 2399
- Timestamp:
- 09/09/22 17:23:16 (21 months ago)
- Location:
- XIOS3/trunk/src
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS3/trunk/src/server.cpp
r2335 r2399 60 60 else is_MPI_Initialized=false ; 61 61 MPI_Comm globalComm=CXios::getGlobalComm() ; 62 62 CTimer::get("XIOS server").resume() ; 63 63 ///////////////////////////////////////// 64 64 ///////////// PART 1 //////////////////// … … 409 409 { 410 410 CTimer::get("XIOS").suspend() ; 411 411 CTimer::get("XIOS server").suspend() ; 412 412 delete eventScheduler ; 413 413 -
XIOS3/trunk/src/timer.hpp
r2274 r2399 20 20 void resume(void); 21 21 void reset(void); 22 void add(double time) { cumulatedTime+=time ;} 23 void minus(double time) { cumulatedTime-=time ;} 22 24 double getCumulatedTime(void); 23 25 static std::map<std::string,CTimer> allTimer; … … 26 28 static std::string getAllCumulatedTime(void) ; 27 29 static void release(void) { allTimer.clear() ;} 30 bool isSuspended() { return suspended; } 28 31 }; 29 32 } -
XIOS3/trunk/src/transport/one_sided_client_buffer.cpp
r2347 r2399 43 43 CXios::getMpiGarbageCollector().registerWindow(winControl_) ; 44 44 45 46 47 CTimer::get("create Windows").suspend() ;48 49 45 MPI_Barrier(winComm_) ; 50 46 MPI_Win_attach(winControl_, control_, controlSize_*sizeof(MPI_Aint)) ; 51 47 MPI_Barrier(winComm_) ; 48 CTimer::get("create Windows").suspend() ; 49 52 50 // MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, winControl_) ; 53 51 // MPI_Win_unlock(0,winControl_) ; … … 217 215 { 218 216 request = requests_.front() ; 217 if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").resume() ; 219 218 MPI_Test(&request.mpiRequest, &flag, MPI_STATUS_IGNORE) ; 219 if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").suspend() ; 220 220 if (flag==true) 221 221 { … … 260 260 } 261 261 } 262 if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ; 262 263 MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, serverRank_, 20, interComm_, &request.mpiRequest ) ; 264 if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ; 263 265 info(logProtocol)<<outStr.str()<<endl ; 264 266 requests_.push_back(request) ; -
XIOS3/trunk/src/transport/one_sided_client_buffer.hpp
r2343 r2399 190 190 bool fixed_=false; 191 191 size_t fixedSize_ = 0 ; 192 size_t currentBufferSize_= 0;192 size_t currentBufferSize_= 0 ; 193 193 double growingFactor_ = 2. ; 194 194 MPI_Aint lastFreedBloc_=0 ; -
XIOS3/trunk/src/transport/one_sided_context_client.cpp
r2343 r2399 96 96 } 97 97 itBuffer->second->eventLoop() ; 98 double time=CTimer::getTime() ; 98 99 bool succed = itBuffer->second->writeEvent(timeLine, event) ; 100 if (succed) 101 { 102 time=CTimer::getTime()-time ; 103 if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").minus(time) ; 104 } 105 99 106 if (succed) event.remove() ; 100 107 else event.next() ; 101 if (event.isFirst()) callGlobalEventLoop() ; 102 } 103 108 if (event.isFirst()) 109 { 110 if (CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").resume() ; 111 callGlobalEventLoop() ; 112 } 113 } 114 if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").suspend() ; 115 104 116 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 105 117 { -
XIOS3/trunk/src/transport/one_sided_context_server.cpp
r2343 r2399 93 93 int flag; 94 94 MPI_Status status; 95 96 traceOff(); 97 MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm, &flag, &status); 98 traceOn(); 99 if (flag==true) 100 { 101 requests_.push_back(CRequest(interComm, status)) ; 102 if (requests_.back().test()) 103 { 104 processRequest(requests_.back()) ; 105 requests_.pop_back() ; 95 flag=true ; 96 97 while(flag) 98 { 99 traceOff(); 100 MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm, &flag, &status); 101 traceOn(); 102 if (flag==true) 103 { 104 requests_.push_back(CRequest(interComm, status)) ; 105 if (requests_.back().test()) 106 { 107 processRequest(requests_.back()) ; 108 requests_.pop_back() ; 109 } 106 110 } 107 111 } … … 139 143 if (!pendingEvents_.empty()) 140 144 { 145 /* 141 146 SPendingEvent& nextEvent = pendingEvents_.begin()->second ; 142 147 for(auto& buffer : nextEvent.buffers ) buffer->eventLoop() ; 143 148 if (nextEvent.nbSenders==0) pendingEvents_.erase(pendingEvents_.begin()) ; 149 */ 150 for(auto it=pendingEvents_.begin() ; it!=pendingEvents_.end() ;) 151 { 152 SPendingEvent& nextEvent = it->second ; 153 for(auto& buffer : nextEvent.buffers ) buffer->eventLoop() ; 154 if (nextEvent.nbSenders==0) it=pendingEvents_.erase(it) ; 155 else ++it ; 156 } 144 157 } 145 158 } … … 188 201 CEventServer event(this) ; 189 202 for(auto& buffer : it->second.buffers) buffer->fillEventServer(currentTimeLine, event) ; 190 203 MPI_Barrier(intraComm) ; 191 204 CTimer::get("Process events").resume(); 192 205 info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event.classId<<" of type "<<event.type<<endl ; … … 281 294 else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event); 282 295 else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event); 283 else if (event.classId==CField::GetType()) CField::dispatchEvent(event); 296 else if (event.classId==CField::GetType()) 297 { 298 if (event.type==CField::EVENT_ID_UPDATE_DATA) CField::dispatchEvent(event); 299 else CField::dispatchEvent(event); 300 } 284 301 else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event); 285 302 else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event); -
XIOS3/trunk/src/transport/one_sided_server_buffer.cpp
r2346 r2399 61 61 else // receive standard event 62 62 { 63 63 info(logProtocol)<<"received request from rank : "<<clientRank_<<" with timeline : "<<timeline 64 <<" at time : "<<CTimer::get("XIOS server").getTime()<<endl ; 64 65 bufferIn>> nbSenders ; 65 66 nbSenders_[timeline] = nbSenders ; … … 101 102 { 102 103 info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ; 104 if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").resume() ; 103 105 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ; 104 106 MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ; 105 107 MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ; 106 108 MPI_Win_unlock(windowRank_,winControl_) ; 109 if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").suspend() ; 107 110 lastBlocToFree_ = 0 ; 108 111 } … … 126 129 if (!pendingRmaRequests_.empty()) 127 130 { 128 int flag ; 131 int flag ; 132 133 if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ; 129 134 MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ; 135 if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ; 136 130 137 if (flag==true) 131 138 { … … 134 141 { 135 142 info(logProtocol)<<"unlock window "<<win<<endl ; 143 if (info.isActive(logProtocol)) CTimer::get("transfer unlock").resume() ; 136 144 MPI_Win_unlock(windowRank_,windows_[win]) ; 145 if (info.isActive(logProtocol)) CTimer::get("transfer unlock").suspend() ; 137 146 } 138 147 windowsLocked_.clear() ; 139 148 149 150 if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).suspend() ; 151 if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).suspend() ; 152 153 size_t transferedSize = 0 ; 154 for(auto& count : pendingRmaCount_) transferedSize+=count ; 155 156 if (info.isActive(logProtocol)) 157 { 158 double time = CTimer::get("lastTransfer from "+std::to_string(clientRank_)).getCumulatedTime() ; 159 info(logProtocol)<<"Tranfer message from rank : "<<clientRank_<<" nbBlocs : "<< pendingRmaStatus_.size() 160 << " total count = "<<transferedSize<<" duration : "<<time<<" s" 161 << " Bandwith : "<< transferedSize/time<< "byte/s"<<endl ; 162 CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ; 163 } 164 140 165 isLocked_=false ; 141 166 pendingRmaRequests_.clear() ; 142 167 pendingRmaStatus_.clear() ; 168 pendingRmaCount_.clear() ; 143 169 completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ; 144 170 … … 195 221 if (bufferResize_.front().first==timeline) 196 222 { 197 currentBufferSize_=bufferResize_.front().second ;223 currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 198 224 info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<" at timeline="<<timeline<<endl ; 199 225 bufferResize_.pop_front() ; … … 216 242 217 243 if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked"); 244 245 if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).resume() ; 246 if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).resume() ; 218 247 for(auto& bloc : blocs) 219 248 { … … 221 250 if (windowsLocked_.count(win)==0) 222 251 { 252 info(logProtocol)<<"lock window "<<win<<endl ; 253 if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ; 223 254 MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ; 255 if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ; 224 256 windowsLocked_.insert(win) ; 225 257 } … … 233 265 pendingBlocs_.erase(pendingBlocs_.begin()) ; 234 266 235 //break ; // transfering just one event temporary => to remove267 // break ; // transfering just one event temporary => to remove 236 268 237 269 if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop … … 244 276 if (bufferResize_.front().first==timeline) 245 277 { 246 currentBufferSize_=bufferResize_.front().second ;278 currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 247 279 info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<" at timeline="<<timeline<<endl ; 248 280 bufferResize_.pop_front() ; … … 259 291 if (windowsLocked_.count(win)==0) 260 292 { 293 info(logProtocol)<<"lock window "<<win<<endl ; 294 if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ; 261 295 MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ; 296 if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ; 262 297 windowsLocked_.insert(win) ; 263 298 } … … 331 366 MPI_Request request ; 332 367 MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ; 333 info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<" addr="<<addr<<" count="<<count<<" buffer="<<buffer<<" start="<<start<<endl ; 334 info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<" end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1) 335 <<" start="<<static_cast<void*>(buffer->getBuffer()+start)<<" end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ; 368 if (info.isActive(logProtocol)) 369 { 370 info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<" addr="<<addr<<" count="<<count<<" buffer="<<buffer<<" start="<<start<<endl ; 371 info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<" end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1) 372 <<" start="<<static_cast<void*>(buffer->getBuffer()+start)<<" end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ; 373 } 374 if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").resume() ; 336 375 MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ; 376 if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ; 337 377 pendingRmaRequests_.push_back(request) ; 378 pendingRmaCount_.push_back(count) ; 338 379 onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ; 339 380 } … … 348 389 349 390 ostringstream outStr ; 350 outStr<<"Received Event from client "<<clientRank_<<" timeline="<<timeline<<" nbBlocs="<<completedEvent.size()<<endl ; 391 if (info.isActive(logProtocol)) outStr<<"Received Event from client "<<clientRank_<<" timeline="<<timeline 392 <<" nbBlocs="<<completedEvent.size()<<endl ; 351 393 int i=0 ; 352 394 MPI_Aint addr ; -
XIOS3/trunk/src/transport/one_sided_server_buffer.hpp
r2343 r2399 132 132 size_t currentBufferSize_=0 ; 133 133 double growingFactor_ = 2. ; 134 double bufferServerFactor_=10. ; 134 135 135 136 std::list<CBuffer*> buffers_ ; … … 144 145 vector<MPI_Request> pendingRmaRequests_ ; 145 146 vector<MPI_Status> pendingRmaStatus_ ; 147 vector<int> pendingRmaCount_ ; 146 148 147 149 map<size_t, list<SBloc>> onTransferEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>>
Note: See TracChangeset
for help on using the changeset viewer.