Changeset 2246
- Timestamp:
- 10/11/21 14:41:56 (3 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_COUPLING/src
- Files:
-
- 1 added
- 25 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp
r2221 r2246 8 8 #include "tracer.hpp" 9 9 #include "timeline_events.hpp" 10 #include "timer.hpp" 10 11 11 12 namespace xios … … 29 30 else hasWindows=true ; 30 31 31 MPI_Alloc_mem(bufferSize+headerSize , MPI_INFO_NULL, &bufferHeader[0]) ;32 MPI_Alloc_mem(bufferSize+headerSize , MPI_INFO_NULL, &bufferHeader[1]) ;33 buffer[0] = bufferHeader[0]+headerSize ;34 buffer[1] = bufferHeader[1]+headerSize ;35 firstTimeLine[0]=(size_t*)bufferHeader[0] ;36 firstTimeLine[1]=(size_t*)bufferHeader[1] ;37 bufferCount[0]=(size_t*)bufferHeader[0] + 1;38 bufferCount[1]=(size_t*)bufferHeader[1] + 1;39 control[0]=(size_t*)bufferHeader[0] + 2;40 control[1]=(size_t*)bufferHeader[1] + 2;41 finalize[0]=(size_t*)bufferHeader[0] +3;42 finalize[1]=(size_t*)bufferHeader[1] +3;32 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 33 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 34 buffer[0] = bufferHeader[0]+headerSize_ ; 35 buffer[1] = bufferHeader[1]+headerSize_ ; 36 firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_ ; 37 firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_ ; 38 bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ; 39 bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ; 40 control[0]=(size_t*)bufferHeader[0] + controlOffset_ ; 41 control[1]=(size_t*)bufferHeader[1] + controlOffset_ ; 42 notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ; 43 notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ; 43 44 44 45 *firstTimeLine[0]=0 ; … … 48 49 *control[0]=0 ; 49 50 *control[1]=0 ; 50 * finalize[0]=0;51 * finalize[1]=0;51 *notify[0]=notifyNothing_ ; 52 *notify[1]=notifyNothing_ ; 52 53 winState[0]=false ; 53 54 winState[1]=false ; … … 57 58 { 58 59 59 MPI_Aint buffSize=bufferSize+headerSize ;60 MPI_Aint buffSize=bufferSize+headerSize_ ; 60 61 MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 61 62 MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; … … 106 107 } 107 108 108 /* void CClientBuffer::createWindows(MPI_Comm oneSidedComm)109 {110 MPI_Barrier(oneSidedComm) ;111 MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ;112 MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ;113 114 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ;115 *firstTimeLine[0]=0 ;116 *bufferCount[0]=0 ;117 *control[0]=0 ;118 MPI_Win_unlock(0, windows[0]) ;119 120 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ;121 *firstTimeLine[1]=0 ;122 *bufferCount[1]=0 ;123 *control[1]=0 ;124 MPI_Win_unlock(0, windows[1]) ;125 winState[0]=false ;126 winState[1]=false ;127 MPI_Barrier(oneSidedComm) ;128 hasWindows=true ;129 }130 */131 132 /*133 void CClientBuffer::freeWindows()134 {135 if (hasWindows)136 {137 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[0]) ;138 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[1]) ;139 *control[0]=2 ;140 *control[1]=2 ;141 MPI_Win_unlock(0, windows_[1]) ;142 MPI_Win_unlock(0, windows_[0]) ;143 144 MPI_Win_free(&windows_[0]) ;145 MPI_Win_free(&windows_[1]) ;146 hasWindows=false ;147 }148 }149 */150 109 void CClientBuffer::lockBuffer(void) 151 110 { 152 if (hasWindows) 153 { 154 // MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[current]) ; 155 long long int lock=1 ; 156 long long int zero=0, one=1 ; 157 111 CTimer::get("lock buffer").resume(); 112 if (hasWindows) 113 { 158 114 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 159 160 while(lock!=0)161 {162 MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)),163 windows_[current]) ;164 MPI_Win_flush(clientRank_, windows_[current]) ;165 }166 167 // info(100)<<"Buffer locked "<<&windows_<<" "<<current<<endl ;168 115 winState[current]=true ; 169 116 } 117 CTimer::get("lock buffer").suspend(); 170 118 } 171 119 172 120 void CClientBuffer::unlockBuffer(void) 173 121 { 174 if (hasWindows) 175 { 176 long long int lock=1 ; 177 long long int zero=0, one=1 ; 178 179 MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 180 windows_[current]) ; 122 CTimer::get("unlock buffer").resume(); 123 if (hasWindows) 124 { 181 125 MPI_Win_unlock(clientRank_, windows_[current]) ; 182 183 // info(100)<<"Buffer unlocked "<<&windows_<<" "<<current<<endl ;184 126 winState[current]=false ; 185 127 } 128 CTimer::get("unlock buffer").suspend(); 186 129 } 187 130 … … 193 136 bool CClientBuffer::isBufferFree(StdSize size) 194 137 { 195 // bool loop=true ;196 // while (loop)197 // {198 // lockBuffer();199 // if (*control[current]==0) loop=false ; // attemp to read from server ?200 // else unlockBuffer() ;201 // }202 138 203 139 lockBuffer(); … … 208 144 if (size > bufferSize) 209 145 { 210 // ERROR("bool CClientBuffer::isBufferFree(StdSize size)",211 // << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl);212 146 resizingBufferStep_=1 ; 147 *firstTimeLine[current]=0 ; 213 148 newBufferSize_=size ; 214 149 return false ; … … 231 166 { 232 167 resizingBufferStep_ = 1 ; 168 *firstTimeLine[current]=0 ; 233 169 newBufferSize_ = (count+size)*growFactor_ ; 234 170 } … … 247 183 if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 248 184 *bufferCount[current]=count ; 249 /* info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current250 <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;251 if (!winState[current]) info(40)<<"CClientBuffer::getBuffer "<<" Windows Not Locked... "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current252 <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;*/253 185 return retBuffer; 254 186 } … … 284 216 MPI_Status status; 285 217 int flag; 218 219 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 220 MPI_Win_unlock(clientRank_, windows_[0]) ; 221 222 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 223 MPI_Win_unlock(clientRank_, windows_[1]) ; 286 224 287 225 if (pending) … … 299 237 if (count > 0) 300 238 { 301 lockBuffer() ; 302 // if (*control[current]==0 && bufferCount[current] > 0) 303 if (*bufferCount[current] > 0) 239 double time=MPI_Wtime() ; 240 if (time - lastCheckedWithNothing_ > latency_) 304 241 { 305 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 306 if (resizingBufferStep_==3) resizingBufferStep_=0 ; 307 pending = true; 308 // *control[current]=0 ; 309 *firstTimeLine[current]=0 ; 310 *bufferCount[current]=0 ; 311 312 unlockBuffer() ; 313 314 if (current == 1) current = 0; 315 else current = 1; 316 count = 0; 317 } 318 else 319 { 320 unlockBuffer() ; 242 lockBuffer() ; 243 if (*bufferCount[current] > 0) 244 { 245 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 246 if (resizingBufferStep_==4) resizingBufferStep_=0 ; 247 pending = true; 248 *firstTimeLine[current]=0 ; 249 *bufferCount[current]=0 ; 250 251 unlockBuffer() ; 252 253 if (current == 1) current = 0; 254 else current = 1; 255 count = 0; 256 } 257 else 258 { 259 unlockBuffer() ; 260 lastCheckedWithNothing_ = time ; 261 } 321 262 } 322 263 } 323 264 else 324 265 { 325 if (resizingBufferStep_==2) resizeBuffer(newBufferSize_) ;326 266 if (resizingBufferStep_==1) resizeBufferNotify() ; 267 else if (resizingBufferStep_==2) isNotifiedChangeBufferSize() ; 268 else if (resizingBufferStep_==3) resizeBuffer(newBufferSize_) ; 327 269 } 328 270 } … … 345 287 void CClientBuffer::resizeBuffer(size_t newSize) 346 288 { 289 347 290 if (hasWindows) 348 291 { … … 354 297 355 298 bufferSize=newSize ; 356 MPI_Alloc_mem(bufferSize+headerSize , MPI_INFO_NULL, &bufferHeader[0]) ;357 MPI_Alloc_mem(bufferSize+headerSize , MPI_INFO_NULL, &bufferHeader[1]) ;358 buffer[0] = bufferHeader[0]+headerSize ;359 buffer[1] = bufferHeader[1]+headerSize ;360 firstTimeLine[0]=(size_t*)bufferHeader[0] ;361 firstTimeLine[1]=(size_t*)bufferHeader[1] ;362 bufferCount[0]=(size_t*)bufferHeader[0] + 1;363 bufferCount[1]=(size_t*)bufferHeader[1] + 1;364 control[0]=(size_t*)bufferHeader[0] + 2 ;365 control[1]=(size_t*)bufferHeader[1] + 2;366 finalize[0]=(size_t*)bufferHeader[0] +3;367 finalize[1]=(size_t*)bufferHeader[1] +3;299 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 300 MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 301 buffer[0] = bufferHeader[0]+headerSize_ ; 302 buffer[1] = bufferHeader[1]+headerSize_ ; 303 firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_; 304 firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_; 305 bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ; 306 bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ; 307 control[0]=(size_t*)bufferHeader[0] + controlOffset_ ; // control=0 => nothing ; control=1 => changeBufferSize 308 control[1]=(size_t*)bufferHeader[1] + controlOffset_ ; 309 notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ; 310 notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ; 368 311 369 312 *firstTimeLine[0]=0 ; … … 373 316 *control[0]=0 ; 374 317 *control[1]=0 ; 375 * finalize[0]=0;376 * finalize[1]=0;318 *notify[0] = notifyNothing_ ; 319 *notify[1] = notifyNothing_ ; 377 320 winState[0]=false ; 378 321 winState[1]=false ; … … 382 325 { 383 326 384 MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize ) ;385 MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize ) ;327 MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize_) ; 328 MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize_) ; 386 329 387 330 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; … … 402 345 bufOut->put(this->getWinAddress(1)); 403 346 404 resizingBufferStep_=3; 405 unlockBuffer() ; 347 resizingBufferStep_=4; 348 unlockBuffer() ; 349 info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinAddress(0)<<" winAdress[1] "<<this->getWinAddress(1)<<endl; 406 350 } 407 351 … … 416 360 } 417 361 418 bool CClientBuffer::isNotified Finalized(void)362 bool CClientBuffer::isNotifiedChangeBufferSize(void) 419 363 { 420 364 421 365 bool ret ; 422 366 lockBuffer() ; 423 ret=*finalize[current] == 1 ? true : false ; 367 ret=*notify[current] == notifyResizeBuffer_ ? true : false ; 368 if (ret) 369 { 370 *notify[current] = notifyNothing_ ; 371 resizingBufferStep_=3; 372 } 424 373 unlockBuffer() ; 425 374 … … 427 376 } 428 377 378 bool CClientBuffer::isNotifiedFinalized(void) 379 { 380 381 bool ret ; 382 lockBuffer() ; 383 ret=*notify[current] == notifyFinalize_ ? true : false ; 384 unlockBuffer() ; 385 386 return ret; 387 } 388 429 389 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.hpp
r2130 r2246 2 2 #define __BUFFER_CLIENT_HPP__ 3 3 4 #include "buffer_cs_base.hpp" 4 5 #include "xios_spl.hpp" 5 6 #include "buffer_out.hpp" … … 9 10 namespace xios 10 11 { 11 class CClientBuffer 12 class CClientBuffer : public CBufferClientServerBase 12 13 { 13 14 public: … … 35 36 void resizeBuffer(size_t newSize) ; 36 37 void resizeBufferNotify(void) ; 38 bool isNotifiedChangeBufferSize(void) ; 37 39 38 40 … … 42 44 size_t* bufferCount[2] ; 43 45 size_t* control[2] ; 44 size_t* finalize[2] ;46 size_t* notify[2] ; 45 47 bool winState[2] ; 46 48 int current; … … 67 69 std::vector<MPI_Win> windows_ ; 68 70 bool hasWindows ; 69 static const int headerSize=4*sizeof(size_t); 71 72 double latency_=1e-2 ; 73 double lastCheckedWithNothing_=0 ; 70 74 }; 71 75 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.cpp
r2130 r2246 2 2 #include "exception.hpp" 3 3 #include "buffer_server.hpp" 4 #include "timer.hpp" 4 5 5 6 … … 31 32 } 32 33 33 /*34 void CServerBuffer::createWindows(MPI_Comm oneSidedComm)35 {36 MPI_Barrier(oneSidedComm) ;37 MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ;38 MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ;39 hasWindows=true ;40 updateCurrentWindows() ;41 MPI_Barrier(oneSidedComm) ;42 }43 */44 45 /*46 bool CServerBuffer::freeWindows()47 {48 if (hasWindows)49 {50 size_t header[3] ;51 size_t& control=header[2] ;52 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows_[0]) ;53 MPI_Get(&control, 1, MPI_LONG_LONG_INT, windowsRank , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ;54 MPI_Win_unlock(0,windows[0]) ;55 if (control==2) // ok for free windows56 {57 MPI_Win_free( &(windows[0])) ;58 MPI_Win_free( &(windows[1])) ;59 hasWindows=false ;60 return true ;61 }62 else return false ;63 }64 else return true ;65 }66 */67 34 68 35 bool CServerBuffer::isBufferFree(size_t count) … … 222 189 bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 223 190 { 191 count = -1 ; 224 192 if (!hasWindows || resizingBuffer_) return false ; 225 226 227 size_t header[3];228 size_t& clientTimeline=header[0] ;229 size_t & clientCount=header[1];230 size_t & control=header[2];193 double time=MPI_Wtime() ; 194 if (time-bufferFromClientTime_ < bufferFromClientLatency_ ) return false; 195 bufferFromClientTime_ = time ; 196 CTimer::get("getBufferFromClient").resume() ; 197 size_t clientTimeline ; 198 size_t clientCount ; 231 199 bool ok=false ; 232 200 … … 238 206 239 207 lockBuffer(); 240 208 CTimer::get("getBufferFromClient_locked").resume() ; 241 209 // lock is acquired 242 210 243 MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], 0), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;244 MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;211 MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 212 MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 245 213 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 246 214 247 // control=1 ;248 // MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;249 250 // MPI_Win_unlock(windowsRank_, windows_[currentWindows]) ;251 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;252 // info(100)<<"getBufferFromClient : windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ;253 215 if (timeLine==clientTimeline) 254 216 { 255 // info(50)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ;256 257 // MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ;258 217 buffer=(char*)getBuffer(clientCount) ; 259 218 count=clientCount ; … … 261 220 clientTimeline = 0 ; 262 221 clientCount = 0 ; 263 // control=0;264 MPI_Put(& header[0], 2, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],0) , 2, MPI_LONG_LONG_INT,windows_[currentWindows]) ;222 MPI_Put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 223 MPI_Put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 265 224 266 225 // release lock 267 unlockBuffer() ; 226 CTimer::get("getBufferFromClient_locked").suspend() ; 227 unlockBuffer() ; 268 228 269 229 ok=true ; … … 283 243 else 284 244 { 285 //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 286 //control=0 ; 287 //MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 245 count=0 ; 288 246 289 247 // release lock 248 CTimer::get("getBufferFromClient_locked").suspend() ; 290 249 unlockBuffer() ; 291 250 } 292 251 CTimer::get("getBufferFromClient").suspend() ; 293 252 if (ok) return true ; 294 253 … … 299 258 { 300 259 if (!hasWindows) return ; 301 302 long long int lock=1 ;303 long long int zero=0, one=1 ;304 // control=1 ;305 260 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 306 while(lock!=0)307 {308 MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)),309 windows_[currentWindows]) ;310 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;311 }312 261 } 313 262 … … 315 264 { 316 265 if (!hasWindows) return ; 317 long long int lock=1 ;318 long long int zero=0, one=1 ;319 320 MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)),321 windows_[currentWindows]) ;322 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;323 266 MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ; 324 267 } … … 327 270 { 328 271 if (!hasWindows) return ; 329 size_t finalize=1;272 size_t notify=notifyFinalize_ ; 330 273 lockBuffer(); 331 274 // lock is acquired 332 MPI_Put(& finalize, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],3*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;275 MPI_Put(¬ify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 333 276 unlockBuffer() ; 334 277 } 278 279 void CServerBuffer::notifyBufferResizing(void) 280 { 281 resizingBuffer_=true ; 282 if (!hasWindows) return ; 283 size_t notify=notifyResizeBuffer_ ; 284 lockBuffer(); 285 // lock is acquired 286 MPI_Put(¬ify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 287 unlockBuffer() ; 288 } 335 289 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.hpp
r2130 r2246 2 2 #define __BUFFER_SERVER_HPP__ 3 3 4 #include "buffer_cs_base.hpp" 4 5 #include "xios_spl.hpp" 5 6 #include "buffer.hpp" … … 9 10 namespace xios 10 11 { 11 class CServerBuffer 12 class CServerBuffer : public CBufferClientServerBase 12 13 { 13 14 public: … … 26 27 void unlockBuffer(void) ; 27 28 void notifyClientFinalize(void) ; 28 void notifyBufferResizing(void) { resizingBuffer_=true ;}29 void notifyBufferResizing(void) ; 29 30 private: 30 31 char* buffer; … … 40 41 bool hasWindows ; 41 42 int windowsRank_ ; 43 double bufferFromClientLatency_=1e-1 ; 44 double bufferFromClientTime_ = 0; 45 42 46 }; 43 47 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.cpp
r2240 r2246 50 50 computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 51 51 52 if (flag) 53 { 54 MPI_Intercomm_merge(interComm_,false, &interCommMerged) ; 55 int interCommMergedRank; 56 MPI_Comm_rank(interComm_, &interCommMergedRank); 57 MPI_Comm_rank(interCommMerged, &interCommMergedRank); 58 MPI_Comm_rank(intraComm, &interCommMergedRank); 59 } 52 if (flag) MPI_Intercomm_merge(interComm_,false, &interCommMerged) ; 60 53 61 54 if (!isAttachedModeEnabled()) 62 55 { 56 57 CTimer::get("create Windows").resume() ; 58 59 // We create dummy pair of intercommunicator between clients and server 60 // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically 61 // We don't know the reason 62 63 MPI_Comm commSelf ; 64 MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 65 vector<MPI_Comm> dummyComm(serverSize) ; 66 for(int rank=0; rank<serverSize; rank++) MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &dummyComm[rank]) ; 67 68 // create windows for one-sided 63 69 windows.resize(serverSize) ; 64 70 MPI_Comm winComm ; … … 67 73 windows[rank].resize(2) ; 68 74 MPI_Comm_split(interCommMerged, rank, clientRank, &winComm); 69 int myRank ;70 MPI_Comm_rank(winComm,&myRank);71 75 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][0]); 72 76 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][1]); 73 77 // ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 74 78 // Bug or not ? 75 // MPI_Comm_free(&winComm) ; 76 } 77 } 78 79 MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 79 // MPI_Comm_free(&winComm) ; 80 } 81 82 // free dummy intercommunicator => take times ? 83 for(int rank=0; rank<serverSize; rank++) MPI_Comm_free(&dummyComm[rank]) ; 84 MPI_Comm_free(&commSelf) ; 85 86 CTimer::get("create Windows").resume() ; 87 } 80 88 81 89 auto time=chrono::system_clock::now().time_since_epoch().count() ; … … 281 289 } 282 290 291 double lastTimeBuffersNotFree=0. ; 292 double time ; 293 bool doUnlockBuffers ; 283 294 CTimer::get("Blocking time").resume(); 284 295 do 285 296 { 286 297 areBuffersFree = true; 287 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 288 { 289 areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 290 } 298 doUnlockBuffers=false ; 299 time=MPI_Wtime() ; 300 if (time-lastTimeBuffersNotFree > latency_) 301 { 302 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 303 { 304 areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 305 } 306 if (!areBuffersFree) 307 { 308 lastTimeBuffersNotFree = time ; 309 doUnlockBuffers=true ; 310 } 311 } 312 else areBuffersFree = false ; 291 313 292 314 if (!areBuffersFree) 293 315 { 294 for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer();316 if (doUnlockBuffers) for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 295 317 checkBuffers(); 296 /* 297 context->server->listen(); 298 299 if (context->serverPrimServer.size()>0) 300 { 301 for (int i = 0; i < context->serverPrimServer.size(); ++i) context->serverPrimServer[i]->listen(); 302 //ym CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 303 context->globalEventLoop() ; 304 } 305 */ 306 context_->globalEventLoop() ; 318 319 context_->globalEventLoop() ; 307 320 } 308 321 … … 383 396 } 384 397 } 385 386 398 } 387 399 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.hpp
r2130 r2246 94 94 MPI_Comm intraComm; //!< Communicator of client group 95 95 96 MPI_Comm commSelf; //!< Communicator of the client alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication97 98 96 map<int,CClientBuffer*> buffers; //!< Buffers for connection to servers 99 97 … … 126 124 CContextServer* associatedServer_ ; //!< The server associated to the pair client/server 127 125 bool isGrowableBuffer_ = true ; 126 127 double latency_=1e-2 ; 128 128 }; 129 129 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp
r2240 r2246 46 46 else attachedMode=true ; 47 47 48 if (flag) MPI_Comm_remote_size(interComm,&commSize); 49 else MPI_Comm_size(interComm,&commSize); 48 int clientSize ; 49 if (flag) MPI_Comm_remote_size(interComm,&clientSize); 50 else MPI_Comm_size(interComm,&clientSize); 50 51 51 52 … … 75 76 if (!isAttachedModeEnabled()) 76 77 { 78 CTimer::get("create Windows").resume() ; 79 77 80 MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 78 // create windows for one sided comm 79 int interCommMergedRank; 81 82 // We create dummy pair of intercommunicator between clients and server 83 // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically 84 // We don't know the reason 85 MPI_Comm commSelf ; 86 MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf) ; 87 vector<MPI_Comm> dummyComm(clientSize) ; 88 for(int rank=0; rank<clientSize ; rank++) MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &dummyComm[rank]) ; 89 90 // create windows for one sided comm 80 91 MPI_Comm winComm ; 81 MPI_Comm_rank(intraComm, &interCommMergedRank);82 92 windows.resize(2) ; 83 for(int rank=c ommSize; rank<commSize+intraCommSize; rank++)84 { 85 if (rank==c ommSize+interCommMergedRank)93 for(int rank=clientSize; rank<clientSize+intraCommSize; rank++) 94 { 95 if (rank==clientSize+intraCommRank) 86 96 { 87 MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 88 int myRank ; 89 MPI_Comm_rank(winComm,&myRank); 97 MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 90 98 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]); 91 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]); 99 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]); 92 100 } 93 else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 94 // ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 95 // Bug or not ? 96 // MPI_Comm_free(&winComm) ; 97 } 101 else MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 102 // ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 103 // Bug or not ? 104 // MPI_Comm_free(&winComm) ; 105 } 106 107 // free dummy intercommunicator 108 for(int rank=0; rank<clientSize ; rank++) MPI_Comm_free(&dummyComm[rank]) ; 109 MPI_Comm_free(&commSelf) ; 110 CTimer::get("create Windows").suspend() ; 98 111 } 99 112 else … … 103 116 windows[1]=MPI_WIN_NULL ; 104 117 } 105 106 107 118 108 MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ;109 119 itLastTimeLine=lastTimeLine.begin() ; 110 120 … … 138 148 bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/) 139 149 { 150 CTimer::get("listen request").resume(); 140 151 listen(); 152 CTimer::get("listen request").suspend(); 153 CTimer::get("check pending request").resume(); 141 154 checkPendingRequest(); 155 checkPendingProbe() ; 156 CTimer::get("check pending request").suspend(); 157 CTimer::get("check event process").resume(); 142 158 if (enableEventsProcessing) processEvents(); 159 CTimer::get("check event process").suspend(); 143 160 return finished; 144 161 } 145 162 /* 146 163 void CContextServer::listen(void) 147 164 { … … 221 238 } 222 239 } 240 */ 241 242 void CContextServer::listen(void) 243 { 244 int rank; 245 int flag; 246 int count; 247 char * addr; 248 MPI_Status status; 249 MPI_Message message ; 250 map<int,CServerBuffer*>::iterator it; 251 bool okLoop; 252 253 traceOff(); 254 MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status); 255 traceOn(); 256 if (flag==true) listenPendingRequest(message, status) ; 257 } 258 259 bool CContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status) 260 { 261 int count; 262 char * addr; 263 map<int,CServerBuffer*>::iterator it; 264 int rank=status.MPI_SOURCE ; 265 266 it=buffers.find(rank); 267 if (it==buffers.end()) // Receive the buffer size and allocate the buffer 268 { 269 MPI_Aint recvBuff[4] ; 270 MPI_Mrecv(recvBuff, 4, MPI_AINT, &message, &status); 271 remoteHashId_ = recvBuff[0] ; 272 StdSize buffSize = recvBuff[1]; 273 vector<MPI_Aint> winAdress(2) ; 274 winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 275 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 276 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 277 lastTimeLine[rank]=0 ; 278 itLastTimeLine=lastTimeLine.begin() ; 279 return true; 280 } 281 else 282 { 283 std::pair<MPI_Message,MPI_Status> mypair(message,status) ; 284 pendingProbe[rank].push_back(mypair) ; 285 return false; 286 } 287 } 288 289 void CContextServer::checkPendingProbe(void) 290 { 291 292 list<int> recvProbe ; 293 list<int>::iterator itRecv ; 294 map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe; 295 296 for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++) 297 { 298 int rank=itProbe->first ; 299 if (pendingRequest.count(rank)==0) 300 { 301 MPI_Message& message = itProbe->second.front().first ; 302 MPI_Status& status = itProbe->second.front().second ; 303 int count ; 304 MPI_Get_count(&status,MPI_CHAR,&count); 305 map<int,CServerBuffer*>::iterator it = buffers.find(rank); 306 if (it->second->isBufferFree(count)) 307 { 308 char * addr; 309 addr=(char*)it->second->getBuffer(count); 310 MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]); 311 bufferRequest[rank]=addr; 312 recvProbe.push_back(rank) ; 313 itProbe->second.pop_front() ; 314 } 315 } 316 } 317 318 for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ; 319 } 223 320 224 321 … … 232 329 int count; 233 330 MPI_Status status; 331 332 if (!pendingRequest.empty()) CTimer::get("receiving requests").resume(); 333 else CTimer::get("receiving requests").suspend(); 234 334 235 335 for(it=pendingRequest.begin();it!=pendingRequest.end();it++) … … 257 357 void CContextServer::getBufferFromClient(size_t timeLine) 258 358 { 359 CTimer::get("CContextServer::getBufferFromClient").resume() ; 259 360 if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 260 361 { … … 267 368 { 268 369 rank=itLastTimeLine->first ; 269 if (itLastTimeLine->second < timeLine && pendingRequest.count(rank)==0 )370 if (itLastTimeLine->second < timeLine && pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 270 371 { 271 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) 272 { 273 processRequest(rank, buffer, count); 274 break ; 275 } 372 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 373 if (count >= 0) break ; 276 374 } 277 375 } 278 376 } 377 CTimer::get("CContextServer::getBufferFromClient").suspend() ; 279 378 } 280 379 … … 388 487 } 389 488 } 390 else getBufferFromClient(currentTimeLine) ;489 else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; 391 490 } 392 491 else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line … … 441 540 // releaseBuffers() ; 442 541 notifyClientsFinalize() ; 542 CTimer::get("receiving requests").suspend(); 443 543 context->finalize(); 444 544 … … 446 546 MPI_Win_free(&windows[0]) ; 447 547 MPI_Win_free(&windows[1]) ; 448 548 449 549 std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 450 550 iteMap = mapBufferSize_.end(), itMap; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.hpp
r2230 r2246 19 19 bool eventLoop(bool enableEventsProcessing = true); 20 20 void listen(void) ; 21 bool listenPendingRequest(MPI_Status& status) ; 21 // bool listenPendingRequest(MPI_Status& status) ; 22 bool listenPendingRequest(MPI_Message &message, MPI_Status& status) ; 23 void checkPendingProbe(void) ; 22 24 void checkPendingRequest(void) ; 23 25 void getBufferFromClient(size_t timeLine) ; … … 42 44 MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 43 45 44 MPI_Comm commSelf; //!< Communicator of the server alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication45 46 46 map<int,CServerBuffer*> buffers ; 47 47 map<int,size_t> lastTimeLine ; //!< last event time line for a processed request 48 48 map<int,size_t>::iterator itLastTimeLine ; //!< iterator on lastTimeLine 49 map<int, list<std::pair<MPI_Message,MPI_Status> > > pendingProbe; 49 50 map<int,MPI_Request> pendingRequest ; 50 51 map<int,char*> bufferRequest ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/distribution/distribution_type.hpp
r1930 r2246 4 4 namespace xios 5 5 { 6 enum class EDistributionType { NONE=0, ROOT, BANDS, BLOCKS } ;6 enum class EDistributionType { NONE=0, ROOT, BANDS, BLOCKS, COLUMNS} ; 7 7 } 8 8 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/contexts_manager.cpp
r1765 r2246 7 7 #include "servers_ressource.hpp" 8 8 #include "server.hpp" 9 #include "timer.hpp" 9 10 #include <functional> 10 11 … … 48 49 int serviceLeader ; 49 50 auto servicesManager = CXios::getServicesManager() ; 50 51 51 52 bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ; 52 53 54 info(40)<<"CContextsManager::createServerContext : waiting for service leader ; serviceId : "<<serviceId<<endl ; 53 55 if (wait) 54 56 { … … 64 66 notifyType_=NOTIFY_CREATE_CONTEXT ; 65 67 notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ; 68 info(40)<<"CContextsManager::createServerContext : notification create_context to service leader "<<serviceLeader<<", serviceId : "<<serviceId<<", contextId "<<contextId<<endl ; 66 69 sendNotification(serviceLeader) ; 67 70 return true ; … … 80 83 81 84 int type ; 85 info(40)<<"CContextsManager::createServerContextIntercomm : waiting for context leader ; contextId : "<<contextId<<endl ; 82 86 ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ; 83 87 if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ; … … 96 100 notifyType_=NOTIFY_CREATE_INTERCOMM ; 97 101 notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ; 102 info(40)<<"CContextsManager::createServerContextIntercomm : notification create_intercomm to context leader : "<<contextLeader<<", contextId :"<<contextId<<endl ; 98 103 sendNotification(contextLeader) ; 99 104 return true ; … … 149 154 void CContextsManager::eventLoop(void) 150 155 { 151 checkNotifications() ; 156 CTimer::get("CContextsManager::eventLoop").resume(); 157 double time=MPI_Wtime() ; 158 if (time-lastEventLoop_ > eventLoopLatency_) 159 { 160 checkNotifications() ; 161 lastEventLoop_=time ; 162 } 163 CTimer::get("CContextsManager::eventLoop").suspend(); 152 164 } 153 165 … … 166 178 void CContextsManager::createServerContext(void) 167 179 { 180 info(40)<<"CContextsManager::createServerContext : receive create server context notification"<<endl ; 168 181 auto arg=notifyCreateContext_ ; 169 182 CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg)) … … 174 187 void CContextsManager::createServerContextIntercomm(void) 175 188 { 189 info(40)<<"CContextsManager::createServerContext : receive create intercomm context notification"<<endl ; 176 190 auto arg=notifyCreateIntercomm_ ; 177 191 CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg)) … … 194 208 void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo) 195 209 { 196 winContexts_->lockWindow(managerGlobalLeader_,0) ; 197 winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 210 winContexts_->lockWindowExclusive(managerGlobalLeader_) ; 211 winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 212 winContexts_->flushWindow(managerGlobalLeader_) ; 198 213 contexts_[fullContextId] = contextInfo ; 199 winContexts_->updateTo Window(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ;200 winContexts_->unlockWindow(managerGlobalLeader_ ,0) ;214 winContexts_->updateToLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ; 215 winContexts_->unlockWindow(managerGlobalLeader_) ; 201 216 } 202 217 … … 210 225 { 211 226 212 winContexts_->lockWindow (managerGlobalLeader_,0) ;213 winContexts_->updateFrom Window(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;214 winContexts_->unlockWindow(managerGlobalLeader_ ,0) ;227 winContexts_->lockWindowShared(managerGlobalLeader_) ; 228 winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 229 winContexts_->unlockWindow(managerGlobalLeader_) ; 215 230 216 231 auto it=contexts_.find(fullContextId) ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/contexts_manager.hpp
r1765 r2246 82 82 83 83 int managerGlobalLeader_ ; 84 84 85 const double eventLoopLatency_=1e-2; 86 double lastEventLoop_=0. ; 87 85 88 friend class CWindowManager ; 86 89 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/daemons_manager.cpp
r2209 r2246 41 41 bool CDaemonsManager::eventLoop(void) 42 42 { 43 43 44 CXios::getRessourcesManager()->eventLoop() ; 44 45 CXios::getServicesManager()->eventLoop() ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/pool_ressource.cpp
r2208 r2246 6 6 #include "type.hpp" 7 7 #include "cxios.hpp" 8 #include "timer.hpp" 8 9 9 10 namespace xios … … 16 17 MPI_Comm_rank(poolComm, &commRank) ; 17 18 MPI_Comm_size(poolComm, &commSize) ; 18 19 19 info(40)<<"CPoolRessource::CPoolRessource : creating new pool : "<<Id<<endl ; 20 20 if (commRank==localLeader_) 21 21 { … … 51 51 occupancy_.erase(occupancy_.begin(),it) ; 52 52 occupancy_.insert(procs_update.begin(),procs_update.end()) ; 53 53 54 info(40)<<"CPoolRessource::createService : notify createService to all pool members ; serviceId : "<<serviceId<<endl ; 54 55 for(int rank=0; rank<commSize; rank++) 55 56 { … … 102 103 bool CPoolRessource::eventLoop(bool serviceOnly) 103 104 { 104 checkCreateServiceNotification() ; 105 CTimer::get("CPoolRessource::eventLoop").resume(); 106 107 double time=MPI_Wtime() ; 108 if (time-lastEventLoop_ > eventLoopLatency_) 109 { 110 checkCreateServiceNotification() ; 111 lastEventLoop_=time ; 112 } 113 105 114 for (auto it=services_.begin(); it!=services_.end() ; ++it) 106 115 { … … 112 121 } 113 122 } 114 123 CTimer::get("CPoolRessource::eventLoop").suspend(); 115 124 if (services_.empty() && finalizeSignal_) return true ; 116 125 else return false ; … … 137 146 void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) 138 147 { 148 149 info(40)<<"CPoolRessource::createNewService : receive createService notification ; serviceId : "<<serviceId<<endl ; 139 150 MPI_Comm serviceComm, newServiceComm ; 140 151 int commRank ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/pool_ressource.hpp
r1764 r2246 44 44 std::string Id_ ; 45 45 bool finalizeSignal_ ; 46 46 47 const double eventLoopLatency_=1e-2; 48 double lastEventLoop_=0. ; 47 49 }; 48 50 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/ressources_manager.cpp
r1764 r2246 2 2 #include "server.hpp" 3 3 #include "servers_ressource.hpp" 4 #include "timer.hpp" 4 5 5 6 … … 43 44 void CRessourcesManager::createPool(const string& poolId, int size) 44 45 { 46 info(40)<<"CRessourcesManager::createPool : calling createPool : "<<poolId<<" of size"<<size<<endl ; 47 info(40)<<"send notification to leader : "<<serverLeader_<<endl ; 45 48 winRessources_->lockWindow(managerGlobalLeader_,0) ; 46 49 winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; … … 49 52 notifyType_=NOTIFY_CREATE_POOL ; 50 53 notifyCreatePool_=make_tuple(poolId, size) ; 54 info(40)<<"CRessourcesManager::createPool : send notification creating pool to server leader "<<serverLeader_<<endl ; 51 55 sendNotification(serverLeader_) ; 52 56 } … … 61 65 { 62 66 notifyType_=NOTIFY_FINALIZE ; 67 info(40)<<"CRessourcesManager::finalize : send notification finalize to server leader "<<serverLeader_<<endl ; 63 68 sendNotification(serverLeader_) ; 64 69 } … … 107 112 void CRessourcesManager::eventLoop(void) 108 113 { 109 checkNotifications() ; 114 CTimer::get("CRessourcesManager::eventLoop").resume(); 115 double time=MPI_Wtime() ; 116 if (time-lastEventLoop_ > eventLoopLatency_) 117 { 118 checkNotifications() ; 119 lastEventLoop_=time ; 120 } 121 122 CTimer::get("CRessourcesManager::eventLoop").suspend(); 110 123 } 111 124 … … 114 127 int commRank ; 115 128 MPI_Comm_rank(xiosComm_, &commRank) ; 129 CTimer::get("CRessourcesManager::checkNotifications lock").resume(); 116 130 winNotify_->lockWindow(commRank,0) ; 131 CTimer::get("CRessourcesManager::checkNotifications lock").suspend(); 132 CTimer::get("CRessourcesManager::checkNotifications pop").resume(); 117 133 winNotify_->popFromWindow(commRank, this, &CRessourcesManager::notificationsDumpIn) ; 134 CTimer::get("CRessourcesManager::checkNotifications pop").suspend(); 135 CTimer::get("CRessourcesManager::checkNotifications unlock").resume(); 118 136 winNotify_->unlockWindow(commRank,0) ; 137 CTimer::get("CRessourcesManager::checkNotifications unlock").suspend(); 119 138 if (notifyType_==NOTIFY_CREATE_POOL) createPool() ; 120 139 else if (notifyType_==NOTIFY_FINALIZE) finalizeSignal() ; … … 123 142 void CRessourcesManager::createPool(void) 124 143 { 144 125 145 auto& arg=notifyCreatePool_ ; 126 146 string poolId=get<0>(arg) ; 127 147 int size=get<1>(arg) ; 148 info(40)<<"CRessourcesManager::createPool : receive create pool notification : "<< poolId<<" of size "<<size<<endl ; 128 149 CServer::getServersRessource()->createPool(poolId,size) ; 129 150 } … … 131 152 void CRessourcesManager::finalizeSignal(void) 132 153 { 154 info(40)<<"CRessourcesManager::createPool : receive finalize notification"<<endl ; 133 155 CServer::getServersRessource()->finalize() ; 134 156 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/ressources_manager.hpp
r1764 r2246 72 72 int freeRessourcesSize_ ; 73 73 74 const double eventLoopLatency_=1e-2; 75 double lastEventLoop_=0. ; 76 74 77 friend class CWindowManager ; 75 78 } ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.cpp
r2230 r2246 6 6 #include "register_context_info.hpp" 7 7 #include "services.hpp" 8 #include "timer.hpp" 8 9 9 10 … … 18 19 hasNotification_(false) 19 20 { 21 info(40)<<"CCServerContext::CServerContext : new context creation ; contextId : "<<contextId<<endl ; 20 22 int localRank, globalRank, commSize ; 21 23 … … 58 60 const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait) 59 61 { 62 info(40)<<"CServerContext::createIntercomm : context intercomm creation ; contextId : "<<contextId<<endl ; 60 63 int intraCommRank ; 61 64 MPI_Comm_rank(intraComm, &intraCommRank) ; … … 145 148 int commSize ; 146 149 MPI_Comm_size(contextComm_,&commSize) ; 150 info(40)<<"CServerContext::createIntercomm : notify createContextIntercomm to all context members ; sourceContext : "<<sourceContext<<endl ; 151 147 152 for(int rank=0; rank<commSize; rank++) 148 153 { … … 191 196 if (!hasNotification_) 192 197 { 193 int commRank ; 194 MPI_Comm_rank(contextComm_, &commRank) ; 195 winNotify_->lockWindow(commRank,0) ; 196 winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ; 197 winNotify_->unlockWindow(commRank,0) ; 198 double time=MPI_Wtime() ; 199 if (time-lastEventLoop_ > eventLoopLatency_) 200 { 201 int commRank ; 202 MPI_Comm_rank(contextComm_, &commRank) ; 203 winNotify_->lockWindow(commRank,0) ; 204 winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ; 205 winNotify_->unlockWindow(commRank,0) ; 198 206 199 if (notifyInType_!= NOTIFY_NOTHING) 200 { 201 hasNotification_=true ; 202 auto eventScheduler=parentService_->getEventScheduler() ; 203 std::hash<string> hashString ; 204 size_t hashId = hashString(name_) ; 205 size_t currentTimeLine=0 ; 206 eventScheduler->registerEvent(currentTimeLine,hashId); 207 if (notifyInType_!= NOTIFY_NOTHING) 208 { 209 hasNotification_=true ; 210 auto eventScheduler=parentService_->getEventScheduler() ; 211 std::hash<string> hashString ; 212 size_t hashId = hashString(name_) ; 213 size_t currentTimeLine=0 ; 214 eventScheduler->registerEvent(currentTimeLine,hashId); 215 } 216 lastEventLoop_=time ; 207 217 } 208 218 } … … 225 235 bool CServerContext::eventLoop(bool serviceOnly) 226 236 { 237 CTimer::get("CServerContext::eventLoop").resume(); 227 238 bool finished=false ; 228 if (winNotify_!=nullptr) checkNotifications() ; 239 240 // double time=MPI_Wtime() ; 241 // if (time-lastEventLoop_ > eventLoopLatency_) 242 // { 243 if (winNotify_!=nullptr) checkNotifications() ; 244 // lastEventLoop_=time ; 245 // } 246 247 229 248 if (!serviceOnly && context_!=nullptr) 230 249 { … … 235 254 } 236 255 } 237 256 CTimer::get("CServerContext::eventLoop").suspend(); 238 257 if (context_==nullptr && finalizeSignal_) finished=true ; 239 258 return finished ; … … 242 261 void CServerContext::createIntercomm(void) 243 262 { 263 info(40)<<"CServerContext::createIntercomm : received createIntercomm notification"<<endl ; 264 244 265 MPI_Comm interCommServer, interCommClient ; 245 266 auto& arg=notifyInCreateIntercomm_ ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.hpp
r2130 r2246 61 61 bool isAttachedMode_ ; 62 62 63 const double eventLoopLatency_=1e-2; 64 double lastEventLoop_=0. ; 65 63 66 friend class CWindowManager ; 64 67 } ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/servers_ressource.cpp
r2220 r2246 5 5 #include "cxios.hpp" 6 6 #include "mpi.hpp" 7 #include "timer.hpp" 7 8 #include <vector> 8 9 #include <string> … … 115 116 bool CServersRessource::eventLoop(bool serviceOnly) 116 117 { 117 checkNotifications() ; 118 CTimer::get("CServersRessource::eventLoop").resume(); 119 double time=MPI_Wtime() ; 120 if (time-lastEventLoop_ > eventLoopLatency_) 121 { 122 checkNotifications() ; 123 lastEventLoop_=time ; 124 } 125 118 126 if (poolRessource_!=nullptr) 119 127 { … … 124 132 } 125 133 } 126 134 CTimer::get("CServersRessource::eventLoop").suspend(); 127 135 if (poolRessource_==nullptr && finalizeSignal_) return true ; 128 136 else return false ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/servers_ressource.hpp
r1764 r2246 50 50 bool finalizeSignal_ ; 51 51 52 const double eventLoopLatency_=1e-2; 53 double lastEventLoop_=0. ; 54 52 55 friend class CWindowManager ; 53 56 } ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.cpp
r2230 r2246 5 5 #include "server_context.hpp" 6 6 #include "event_scheduler.hpp" 7 #include "timer.hpp" 7 8 8 9 namespace xios … … 14 15 15 16 { 17 info(40)<<"CService::CService : new service created ; serviceId : "<<serviceId<<endl ; 18 16 19 int localRank, globalRank, commSize ; 17 20 … … 44 47 int commSize ; 45 48 MPI_Comm_size(serviceComm_, &commSize) ; 46 49 info(40)<<"CService::createContext : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ; 50 47 51 for(int rank=0; rank<commSize; rank++) 48 52 { … … 51 55 sendNotification(rank) ; 52 56 } 57 info(40)<<"CService::createContext : notify CreateContext to all services members : DONE "<<endl ; 53 58 } 54 59 /* … … 99 104 { 100 105 //checkCreateContextNotification() ; 101 checkNotifications() ; 106 CTimer::get("CService::eventLoop").resume(); 107 108 // double time=MPI_Wtime() ; 109 // if (time-lastEventLoop_ > eventLoopLatency_) 110 // { 111 checkNotifications() ; 112 // lastEventLoop_=time ; 113 // } 114 102 115 103 116 eventScheduler_->checkEvent() ; … … 111 124 } ; 112 125 } 113 126 CTimer::get("CService::eventLoop").suspend(); 114 127 if (contexts_.empty() && finalizeSignal_) return true ; 115 128 else return false ; … … 144 157 if (notifyInType_==NOTIFY_CREATE_CONTEXT) 145 158 { 146 info(10)<<"NotifyDumpOut"<<endl ;147 159 auto& arg=notifyInCreateContext_ ; 148 160 buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg); … … 158 170 if (!hasNotification_) 159 171 { 160 int commRank ; 161 MPI_Comm_rank(serviceComm_, &commRank) ; 162 winNotify_->lockWindow(commRank,0) ; 163 winNotify_->popFromWindow(commRank, this, &CService::notificationsDumpIn) ; 164 winNotify_->unlockWindow(commRank,0) ; 172 double time=MPI_Wtime() ; 173 if (time-lastEventLoop_ > eventLoopLatency_) 174 { 175 int commRank ; 176 MPI_Comm_rank(serviceComm_, &commRank) ; 177 winNotify_->lockWindow(commRank,0) ; 178 winNotify_->popFromWindow(commRank, this, &CService::notificationsDumpIn) ; 179 winNotify_->unlockWindow(commRank,0) ; 165 180 166 if (notifyInType_!= NOTIFY_NOTHING) 167 { 168 hasNotification_=true ; 169 std::hash<string> hashString ; 170 size_t hashId = hashString(name_) ; 171 size_t currentTimeLine=0 ; 172 eventScheduler_->registerEvent(currentTimeLine,hashId); 181 if (notifyInType_!= NOTIFY_NOTHING) 182 { 183 hasNotification_=true ; 184 std::hash<string> hashString ; 185 size_t hashId = hashString(name_) ; 186 size_t currentTimeLine=0 ; 187 info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler"<<endl ; 188 eventScheduler_->registerEvent(currentTimeLine,hashId); 189 } 190 lastEventLoop_=time ; 173 191 } 174 192 } … … 179 197 size_t hashId = hashString(name_) ; 180 198 size_t currentTimeLine=0 ; 199 info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ; 181 200 if (eventScheduler_->queryEvent(currentTimeLine,hashId)) 182 201 { 183 202 eventScheduler_->popEvent() ; 203 info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ; 184 204 if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ; 185 205 hasNotification_=false ; … … 190 210 191 211 192 212 //ym not use any more 193 213 void CService::checkCreateContextNotification(void) 194 214 { … … 210 230 void CService::createContext(void) 211 231 { 232 info(40)<<"CService::createContext(void) : receive createContext notification"<<endl ; 212 233 auto& arg=notifyInCreateContext_ ; 213 234 string poolId = get<0>(arg) ; … … 218 239 } 219 240 220 //to remove 241 //to remove, not used anymore 221 242 void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId) 222 243 { -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.hpp
r2130 r2246 71 71 int nbPartitions_ ; 72 72 73 const double eventLoopLatency_=1e-2; 74 double lastEventLoop_=0. ; 75 73 76 }; 74 77 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services_manager.cpp
r1764 r2246 7 7 #include "server.hpp" 8 8 #include "servers_ressource.hpp" 9 #include "timer.hpp" 9 10 10 11 namespace xios … … 55 56 int poolSize ; 56 57 58 info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ; 57 59 bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ; 58 60 if (wait) … … 67 69 if (ok) 68 70 { 71 info(40)<<"CServicesManager : create service notification to leader "<<leader<<", serviceId : "<<serviceId<<", size : "<<size<<endl ; 69 72 createServicesNotify(leader, serviceId, type, size, nbPartitions) ; 70 73 return true ; … … 94 97 { 95 98 auto info = notifications_.front() ; 99 xios::info(40)<<"CServicesManager : receive create service notification : "<<get<0>(info)<<endl ; 96 100 CServer::getServersRessource()->getPoolRessource()->createService(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ; 97 101 notifications_.pop_front() ; … … 104 108 void CServicesManager::eventLoop(void) 105 109 { 106 checkCreateServicesNotification() ; 110 CTimer::get("CServicesManager::eventLoop").resume(); 111 double time=MPI_Wtime() ; 112 if (time-lastEventLoop_ > eventLoopLatency_) 113 { 114 checkCreateServicesNotification() ; 115 lastEventLoop_=time ; 116 } 117 CTimer::get("CServicesManager::eventLoop").suspend(); 107 118 } 108 119 … … 176 187 { 177 188 189 info(40)<<"CServicesManager : registering service, poolId : "<<poolId<<", serviceId : "<<serviceId<<endl ; ; 190 191 winServices_->lockWindowExclusive(managerGlobalLeader_) ; 192 winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 193 winServices_->flushWindow(managerGlobalLeader_) ; 194 services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ; 195 winServices_->updateToLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ; 196 winServices_->unlockWindow(managerGlobalLeader_) ; 197 198 /* 178 199 winServices_->lockWindow(managerGlobalLeader_,0) ; 179 200 winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 180 201 services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ; 181 202 winServices_->updateToWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ; 182 winServices_->unlockWindow(managerGlobalLeader_,0) ; 203 winServices_->unlockWindow(managerGlobalLeader_,0) ;*/ 183 204 } 184 205 … … 186 207 int& size, int& nbPartitions, int& leader) 187 208 { 209 210 winServices_->lockWindowShared(managerGlobalLeader_) ; 211 winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 212 winServices_->unlockWindow(managerGlobalLeader_) ; 213 /* 188 214 winServices_->lockWindow(managerGlobalLeader_,0) ; 189 215 winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 190 winServices_->unlockWindow(managerGlobalLeader_,0) ; 216 winServices_->unlockWindow(managerGlobalLeader_,0) ;*/ 191 217 192 218 auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services_manager.hpp
r1764 r2246 59 59 60 60 int managerGlobalLeader_ ; 61 62 const double eventLoopLatency_=1e-2; 63 double lastEventLoop_=0. ; 61 64 62 65 friend class CWindowManager ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/window_manager.hpp
r1764 r2246 25 25 MPI_Win window_ ; 26 26 void * winBuffer_ ; 27 map<int,double> lastTimeLock_ ; 28 const double latency_=0e-2 ; 27 29 28 30 public : … … 46 48 { 47 49 int lock=state ; 48 50 double time ; 51 auto it=lastTimeLock_.find(rank) ; 52 if (it == lastTimeLock_.end()) 53 { 54 lastTimeLock_[rank] = 0. ; 55 it=lastTimeLock_.find(rank) ; 56 } 57 double& lastTime = it->second ; 58 49 59 do 50 60 { 61 time=MPI_Wtime() ; 62 while(time-lastTime < latency_) time=MPI_Wtime() ; 51 63 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 52 64 MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ; 53 65 MPI_Win_unlock(rank, window_) ; 66 lastTime=MPI_Wtime() ; 54 67 } while (lock!=state) ; 55 56 57 } 58 68 69 70 } 71 72 void lockWindowExclusive(int rank, int state ) 73 { 74 int lock=state ; 75 double time ; 76 auto it=lastTimeLock_.find(rank) ; 77 if (it == lastTimeLock_.end()) 78 { 79 lastTimeLock_[rank] = 0. ; 80 it=lastTimeLock_.find(rank) ; 81 } 82 double& lastTime = it->second ; 83 84 do 85 { 86 time=MPI_Wtime() ; 87 while(time-lastTime < latency_) time=MPI_Wtime() ; 88 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 89 MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ; 90 MPI_Win_unlock(rank, window_) ; 91 lastTime=MPI_Wtime() ; 92 } while (lock!=state) ; 93 } 94 95 void lockWindowExclusive(int rank) 96 { 97 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 98 } 99 100 void lockWindowShared(int rank) 101 { 102 MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ; 103 } 104 105 void unlockWindow(int rank) 106 { 107 MPI_Win_unlock(rank, window_) ; 108 } 109 110 void flushWindow(int rank) 111 { 112 MPI_Win_flush(rank, window_) ; 113 } 59 114 60 115 void unlockWindow(int rank, int state ) … … 77 132 MPI_Win_unlock(rank, window_) ; 78 133 } 79 134 135 template< class T > 136 void updateToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) ) 137 { 138 CBufferOut buffer ; 139 (object->*dumpOut)(buffer) ; 140 size_t size=buffer.count() ; 141 // MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 142 MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ; 143 MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ; 144 // MPI_Win_unlock(rank, window_) ; 145 } 146 80 147 template< typename T > 81 148 void updateFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) … … 90 157 (object->*dumpIn)(buffer) ; 91 158 } 159 160 template< typename T > 161 void updateFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) ) 162 { 163 size_t size ; 164 // MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ; 165 MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ; 166 MPI_Win_flush(rank,window_) ; 167 CBufferIn buffer(size) ; 168 MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ; 169 // MPI_Win_unlock(rank, window_) ; 170 MPI_Win_flush(rank, window_) ; 171 (object->*dumpIn)(buffer) ; 172 } 173 92 174 93 175 template< class T >
Note: See TracChangeset
for help on using the changeset viewer.