Changeset 1757
- Timestamp:
- 10/18/19 14:55:57 (5 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_ONE_SIDED/src
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.cpp
r1639 r1757 12 12 size_t CClientBuffer::maxRequestSize = 0; 13 13 14 CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize, StdSize maxBufferedEvents)14 CClientBuffer::CClientBuffer(MPI_Comm interComm, vector<MPI_Win>& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 15 15 : interComm(interComm) 16 , clientRank_(clientRank) 16 17 , serverRank(serverRank) 17 18 , bufferSize(bufferSize) … … 20 21 , current(0) 21 22 , count(0) 22 , bufferedEvents(0)23 , maxBufferedEvents(maxBufferedEvents)24 23 , pending(false) 25 { 26 buffer[0] = new char[bufferSize]; // transform it with MPI_ALLOC_MEM later 27 buffer[1] = new char[bufferSize]; 24 , hasWindows(false) 25 , windows_(windows) 26 { 27 if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 28 else hasWindows=true ; 29 30 MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 31 MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 32 buffer[0] = bufferHeader[0]+headerSize ; 33 buffer[1] = bufferHeader[1]+headerSize ; 34 firstTimeLine[0]=(size_t*)bufferHeader[0] ; 35 firstTimeLine[1]=(size_t*)bufferHeader[1] ; 36 bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 37 bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 38 control[0]=(size_t*)bufferHeader[0] +2 ; 39 control[1]=(size_t*)bufferHeader[1] +2 ; 40 finalize[0]=(size_t*)bufferHeader[0] +3 ; 41 finalize[1]=(size_t*)bufferHeader[1] +3 ; 42 43 *firstTimeLine[0]=0 ; 44 *firstTimeLine[1]=0 ; 45 *bufferCount[0]=0 ; 46 *bufferCount[1]=0 ; 47 *control[0]=0 ; 48 *control[1]=0 ; 49 *finalize[0]=0 ; 50 *finalize[1]=0 ; 51 winState[0]=false ; 52 winState[1]=false ; 53 54 55 if (hasWindows) 56 { 57 58 MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ; 59 MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ; 60 61 MPI_Group group ; 62 int groupSize,groupRank ; 63 MPI_Win_get_group(windows_[0], &group) ; 64 MPI_Group_size(group, &groupSize) ; 65 MPI_Group_rank(group, &groupRank) ; 66 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 67 68 MPI_Win_get_group(windows_[1], &group) ; 69 MPI_Group_size(group, &groupSize) ; 70 MPI_Group_rank(group, &groupRank) ; 71 if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 72 73 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 74 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 75 76 MPI_Win_unlock(clientRank_, windows_[1]) ; 77 MPI_Win_unlock(clientRank_, windows_[0]) ; 78 } 28 79 retBuffer = new CBufferOut(buffer[current], bufferSize); 29 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << " with a maximum of " << maxBufferedEvents << " buffered events" << endl; 80 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 81 } 82 83 MPI_Aint CClientBuffer::getWinAddress(int i) 84 { 85 MPI_Aint address ; 86 87 if (hasWindows) MPI_Get_address(bufferHeader[i], &address) ; 88 else address=0 ; 89 90 return address ; 30 91 } 31 92 32 93 CClientBuffer::~CClientBuffer() 33 94 { 34 delete [] buffer[0]; 35 delete [] buffer[1]; 36 delete retBuffer; 95 //freeWindows() ; 96 if (hasWindows) 97 { 98 MPI_Win_detach(windows_[0],bufferHeader[0]); 99 MPI_Win_detach(windows_[1],bufferHeader[1]); 100 MPI_Free_mem(bufferHeader[0]) ; 101 MPI_Free_mem(bufferHeader[1]) ; 102 } 103 delete retBuffer; 104 } 105 106 /* void CClientBuffer::createWindows(MPI_Comm oneSidedComm) 107 { 108 MPI_Barrier(oneSidedComm) ; 109 MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 110 MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 111 112 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 113 *firstTimeLine[0]=0 ; 114 *bufferCount[0]=0 ; 115 *control[0]=0 ; 116 MPI_Win_unlock(0, windows[0]) ; 117 118 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 119 *firstTimeLine[1]=0 ; 120 *bufferCount[1]=0 ; 121 *control[1]=0 ; 122 MPI_Win_unlock(0, windows[1]) ; 123 winState[0]=false ; 124 winState[1]=false ; 125 MPI_Barrier(oneSidedComm) ; 126 hasWindows=true ; 127 } 128 */ 129 130 /* 131 void CClientBuffer::freeWindows() 132 { 133 if (hasWindows) 134 { 135 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[0]) ; 136 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[1]) ; 137 *control[0]=2 ; 138 *control[1]=2 ; 139 MPI_Win_unlock(0, windows_[1]) ; 140 MPI_Win_unlock(0, windows_[0]) ; 141 142 MPI_Win_free(&windows_[0]) ; 143 MPI_Win_free(&windows_[1]) ; 144 hasWindows=false ; 145 } 146 } 147 */ 148 void CClientBuffer::lockBuffer(void) 149 { 150 if (hasWindows) 151 { 152 // MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[current]) ; 153 long long int lock=1 ; 154 long long int zero=0, one=1 ; 155 156 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 157 158 while(lock!=0) 159 { 160 MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 161 windows_[current]) ; 162 MPI_Win_flush(clientRank_, windows_[current]) ; 163 } 164 165 // info(100)<<"Buffer locked "<<&windows_<<" "<<current<<endl ; 166 winState[current]=true ; 167 } 168 } 169 170 void CClientBuffer::unlockBuffer(void) 171 { 172 if (hasWindows) 173 { 174 long long int lock=1 ; 175 long long int zero=0, one=1 ; 176 177 MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 178 windows_[current]) ; 179 MPI_Win_unlock(clientRank_, windows_[current]) ; 180 181 // info(100)<<"Buffer unlocked "<<&windows_<<" "<<current<<endl ; 182 winState[current]=false ; 183 } 37 184 } 38 185 … … 44 191 bool CClientBuffer::isBufferFree(StdSize size) 45 192 { 193 // bool loop=true ; 194 // while (loop) 195 // { 196 // lockBuffer(); 197 // if (*control[current]==0) loop=false ; // attemp to read from server ? 198 // else unlockBuffer() ; 199 // } 200 201 lockBuffer(); 46 202 if (size > bufferSize) 47 203 ERROR("bool CClientBuffer::isBufferFree(StdSize size)", … … 59 215 } 60 216 61 62 return (size <= remain() && bufferedEvents < maxBufferedEvents);63 } 64 65 66 CBufferOut* CClientBuffer::getBuffer( StdSize size)217 count=*bufferCount[current] ; 218 return (size <= remain()); 219 } 220 221 222 CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size) 67 223 { 68 224 if (size <= remain()) … … 70 226 retBuffer->realloc(buffer[current] + count, size); 71 227 count += size; 72 bufferedEvents++; 228 if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 229 *bufferCount[current]=count ; 230 /* info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 231 <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ; 232 if (!winState[current]) info(40)<<"CClientBuffer::getBuffer "<<" Windows Not Locked... "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 233 <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;*/ 73 234 return retBuffer; 74 235 } … … 81 242 } 82 243 83 bool CClientBuffer::checkBuffer(void) 244 void CClientBuffer::infoBuffer(void) 245 { 246 247 char checksum=0 ; 248 for(size_t i=0;i<*bufferCount[current];i++) checksum=checksum+buffer[current][i] ; 249 250 char checksumFirst=0 ; 251 for(size_t i=5; i<10 && i<*bufferCount[current] ;i++) checksumFirst=checksumFirst+buffer[current][i] ; 252 253 char checksumLast=0 ; 254 for(size_t i=(*bufferCount[current]<10)?0:*bufferCount[current]-10; i<*bufferCount[current] ; i++) checksumLast=checksumLast+buffer[current][i] ; 255 256 info(45)<<"CClientBuffer::infoBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current<<" WinState "<<winState[current] 257 <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<" checksum "<<(int)checksum<<" " 258 <<(int)buffer[current][0]<<" "<<(int)buffer[current][1]<<" "<<(int)buffer[current][2]<<" "<<(int)buffer[current][3]<<" "<<(int)buffer[current][4]<<" "<<(int)buffer[current][5]<<" " 259 <<(int)buffer[current][6]<<" "<<(int)buffer[current][7]<<" "<<(int)buffer[current][8]<<" "<<(int)buffer[current][9]<<" "<<(int)buffer[current][10]<<" "<<(int)buffer[current][11]<<endl ; 260 261 } 262 263 bool CClientBuffer::checkBuffer(bool send) 84 264 { 85 265 MPI_Status status; … … 96 276 if (!pending) 97 277 { 278 if (!send) return false ; 98 279 if (count > 0) 99 280 { 100 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 101 pending = true; 102 if (current == 1) current = 0; 103 else current = 1; 104 count = 0; 105 bufferedEvents = 0; 281 lockBuffer() ; 282 // if (*control[current]==0 && bufferCount[current] > 0) 283 if (*bufferCount[current] > 0) 284 { 285 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 286 pending = true; 287 // *control[current]=0 ; 288 *firstTimeLine[current]=0 ; 289 *bufferCount[current]=0 ; 290 291 unlockBuffer() ; 292 293 if (current == 1) current = 0; 294 else current = 1; 295 count = 0; 296 } 297 else unlockBuffer() ; 106 298 } 107 299 } … … 112 304 bool CClientBuffer::hasPendingRequest(void) 113 305 { 306 307 lockBuffer() ; 308 count=*bufferCount[current] ; 309 unlockBuffer() ; 310 114 311 return (pending || count > 0); 115 312 } 313 314 bool CClientBuffer::isNotifiedFinalized(void) 315 { 316 317 bool ret ; 318 lockBuffer() ; 319 ret=*finalize[current] == 1 ? true : false ; 320 unlockBuffer() ; 321 322 return ret; 323 } 324 116 325 } -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.hpp
r1639 r1757 14 14 static size_t maxRequestSize; 15 15 16 CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize, StdSize maxBufferedEvents);16 CClientBuffer(MPI_Comm intercomm, vector<MPI_Win>& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize); 17 17 ~CClientBuffer(); 18 18 // void createWindows(MPI_Comm oneSidedComm) ; 19 void freeWindows(void) ; 20 void lockBuffer(void) ; 21 void unlockBuffer(void) ; 22 19 23 bool isBufferFree(StdSize size); 20 CBufferOut* getBuffer( StdSize size);21 bool checkBuffer( void);24 CBufferOut* getBuffer(size_t timeLine, StdSize size); 25 bool checkBuffer(bool send=false); 22 26 bool hasPendingRequest(void); 23 27 StdSize remain(void); 24 28 MPI_Aint getWinAddress(int numWindows) ; 29 void infoBuffer(void) ; 30 bool isNotifiedFinalized(void) ; 25 31 private: 26 32 char* buffer[2]; 27 33 char* bufferHeader[2]; 34 size_t* firstTimeLine[2] ; 35 size_t* bufferCount[2] ; 36 size_t* control[2] ; 37 size_t* finalize[2] ; 38 bool winState[2] ; 28 39 int current; 29 40 30 41 StdSize count; 31 StdSize bufferedEvents;32 42 StdSize maxEventSize; 33 const StdSize maxBufferedEvents;34 43 const StdSize bufferSize; 35 44 const StdSize estimatedMaxEventSize; … … 37 46 38 47 const int serverRank; 48 const int clientRank_; 39 49 bool pending; 40 50 … … 43 53 CBufferOut* retBuffer; 44 54 const MPI_Comm interComm; 55 std::vector<MPI_Win> windows_ ; 56 bool hasWindows ; 57 static const int headerSize=4*sizeof(size_t); 45 58 }; 46 59 } -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.cpp
r885 r1757 7 7 { 8 8 9 CServerBuffer::CServerBuffer(StdSize buffSize) 9 CServerBuffer::CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize) 10 : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress) 10 11 { 11 12 size = 3 * buffSize; … … 13 14 current = 1; 14 15 end = size; 16 used=0 ; 15 17 buffer = new char[size]; // use MPI_ALLOC_MEM later? 18 currentWindows=1 ; 19 if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 16 20 } 17 21 … … 21 25 } 22 26 27 void CServerBuffer::updateCurrentWindows(void) 28 { 29 if (currentWindows==0) currentWindows=1 ; 30 else currentWindows=0 ; 31 } 32 33 /* 34 void CServerBuffer::createWindows(MPI_Comm oneSidedComm) 35 { 36 MPI_Barrier(oneSidedComm) ; 37 MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 38 MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 39 hasWindows=true ; 40 updateCurrentWindows() ; 41 MPI_Barrier(oneSidedComm) ; 42 } 43 */ 44 45 /* 46 bool CServerBuffer::freeWindows() 47 { 48 if (hasWindows) 49 { 50 size_t header[3] ; 51 size_t& control=header[2] ; 52 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows_[0]) ; 53 MPI_Get(&control, 1, MPI_LONG_LONG_INT, windowsRank , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ; 54 MPI_Win_unlock(0,windows[0]) ; 55 if (control==2) // ok for free windows 56 { 57 MPI_Win_free( &(windows[0])) ; 58 MPI_Win_free( &(windows[1])) ; 59 hasWindows=false ; 60 return true ; 61 } 62 else return false ; 63 } 64 else return true ; 65 } 66 */ 23 67 24 68 bool CServerBuffer::isBufferFree(size_t count) … … 72 116 } 73 117 118 bool CServerBuffer::isBufferEmpty(void) 119 { 120 if (used==0) return true ; 121 else return false; 122 } 74 123 75 124 void* CServerBuffer::getBuffer(size_t count) … … 128 177 } 129 178 179 used+=count ; 130 180 return ret ; 131 181 } … … 167 217 } 168 218 } 169 } 170 219 used-=count ; 220 } 221 222 bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 223 { 224 if (!hasWindows) return false ; 225 226 227 size_t header[3] ; 228 size_t& clientTimeline=header[0] ; 229 size_t& clientCount=header[1] ; 230 size_t& control=header[2] ; 231 bool ok=false ; 232 233 MPI_Group group ; 234 int groupSize,groupRank ; 235 MPI_Win_get_group(windows_[currentWindows], &group) ; 236 MPI_Group_size(group, &groupSize) ; 237 MPI_Group_rank(group, &groupRank) ; 238 239 lockBuffer(); 240 241 // lock is acquired 242 243 MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],0), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 244 MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 245 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 246 247 // control=1 ; 248 // MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 249 250 // MPI_Win_unlock(windowsRank_, windows_[currentWindows]) ; 251 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 252 // info(100)<<"getBufferFromClient : windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 253 if (timeLine==clientTimeline) 254 { 255 // info(50)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 256 257 // MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 258 buffer=(char*)getBuffer(clientCount) ; 259 count=clientCount ; 260 MPI_Get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR, windows_[currentWindows]) ; 261 clientTimeline = 0 ; 262 clientCount = 0 ; 263 // control=0 ; 264 MPI_Put(&header[0], 2, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],0) , 2, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 265 266 // release lock 267 unlockBuffer() ; 268 269 ok=true ; 270 char checksum=0 ; 271 for(size_t i=0;i<count;i++) checksum=checksum+buffer[i] ; 272 char checksumFirst=0 ; 273 for(size_t i=5; i<10 && i<count ;i++) checksumFirst=checksumFirst+buffer[i] ; 274 char checksumLast=0 ; 275 for(size_t i=(count<10)?0:count-10; i<count ; i++) checksumLast=checksumLast+buffer[i] ; 276 277 info(40)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline " 278 <<clientTimeline<<" clientCount "<<count<<" checksum "<<(int)checksum<<" " 279 <<(int)buffer[0]<<" "<<(int)buffer[1]<<" "<<(int)buffer[2]<<" "<<(int)buffer[3]<<" "<<(int)buffer[4]<<" "<<(int)buffer[5]<<" " 280 <<(int)buffer[6]<<" "<<(int)buffer[7]<<" "<<(int)buffer[8]<<" "<<(int)buffer[9]<<" "<<(int)buffer[10]<<" "<<(int)buffer[11]<<endl ; 281 282 } 283 else 284 { 285 //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 286 //control=0 ; 287 //MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 288 289 // release lock 290 unlockBuffer() ; 291 } 292 293 if (ok) return true ; 294 295 return false ; 296 } 297 298 void CServerBuffer::lockBuffer(void) 299 { 300 if (!hasWindows) return ; 301 302 long long int lock=1 ; 303 long long int zero=0, one=1 ; 304 // control=1 ; 305 MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 306 while(lock!=0) 307 { 308 MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 309 windows_[currentWindows]) ; 310 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 311 } 312 } 313 314 void CServerBuffer::unlockBuffer(void) 315 { 316 if (!hasWindows) return ; 317 long long int lock=1 ; 318 long long int zero=0, one=1 ; 319 320 MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 321 windows_[currentWindows]) ; 322 MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 323 MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ; 324 } 325 326 void CServerBuffer::notifyClientFinalize(void) 327 { 328 if (!hasWindows) return ; 329 size_t finalize=1 ; 330 lockBuffer(); 331 // lock is acquired 332 MPI_Put(&finalize, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],3*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 333 unlockBuffer() ; 334 } 171 335 } -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.hpp
r717 r1757 12 12 { 13 13 public: 14 CServerBuffer( StdSize bufSize) ;14 CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize bufSize) ; 15 15 ~CServerBuffer() ; 16 16 … … 18 18 void* getBuffer(size_t count) ; 19 19 void freeBuffer(size_t count) ; 20 20 void createWindows(MPI_Comm oneSidedComm) ; 21 bool freeWindows(void) ; 22 bool getBufferFromClient(size_t timeLine, char* & buffer, size_t& count) ; 23 bool isBufferEmpty(void) ; 24 void updateCurrentWindows(void) ; 25 void lockBuffer(void) ; 26 void unlockBuffer(void) ; 27 void notifyClientFinalize(void) ; 21 28 private: 22 29 char* buffer; … … 25 32 size_t end; 26 33 size_t size; 34 size_t used ; // count of element occupied 35 std::vector<MPI_Win> windows_ ; 36 std::vector<MPI_Aint> winAddress_ ; 37 38 int currentWindows ; 39 bool hasWindows ; 40 int windowsRank_ ; 27 41 }; 28 42 } -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_client.cpp
r1639 r1757 24 24 : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4) 25 25 { 26 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 27 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 28 26 29 context = parent; 27 30 intraComm = intraComm_; … … 37 40 computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 38 41 39 timeLine = 0; 42 if (flag) MPI_Intercomm_merge(interComm_,false,&interCommMerged) ; 43 44 if (!isAttachedModeEnabled()) 45 { 46 windows.resize(serverSize) ; 47 MPI_Comm winComm ; 48 for(int rank=0; rank<serverSize; rank++) 49 { 50 windows[rank].resize(2) ; 51 MPI_Comm_split(interCommMerged, rank, clientRank, &winComm); 52 int myRank ; 53 MPI_Comm_rank(winComm,&myRank); 54 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][0]); 55 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][1]); 56 MPI_Comm_free(&winComm) ; 57 } 58 } 59 60 MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 61 62 timeLine = 1; 40 63 } 41 64 … … 116 139 list<int> sizes = event.getSizes(); 117 140 118 // We force the getBuffers call to be non-blocking on classical servers141 // We force the getBuffers call to be non-blocking on classical servers 119 142 list<CBufferOut*> buffList; 120 bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) )); 121 // bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer ); 122 123 if (couldBuffer) 124 { 125 event.send(timeLine, sizes, buffList); 126 info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<" sent"<<endl ; 127 128 checkBuffers(ranks); 129 130 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 131 { 132 waitEvent(ranks); 133 CContext::setCurrent(context->getId()); 134 } 135 } 136 else 137 { 138 tmpBufferedEvent.ranks = ranks; 139 tmpBufferedEvent.sizes = sizes; 140 141 for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 142 tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 143 info(100)<<"DEBUG : temporaly event created : timeline "<<timeLine<<endl ; 144 event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 145 info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<" sent"<<endl ; 143 getBuffers(timeLine, ranks, sizes, buffList) ; 144 145 event.send(timeLine, sizes, buffList); 146 147 //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ; 148 149 unlockBuffers(ranks) ; 150 info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<" sent"<<endl ; 151 152 checkBuffers(ranks); 153 154 if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 155 { 156 waitEvent(ranks); 157 CContext::setCurrent(context->getId()); 146 158 } 147 159 } 148 160 149 161 timeLine++; 150 }151 152 /*!153 * Send the temporarily buffered event (if any).154 *155 * \return true if a temporarily buffered event could be sent, false otherwise156 */157 bool CContextClient::sendTemporarilyBufferedEvent()158 {159 bool couldSendTmpBufferedEvent = false;160 161 if (hasTemporarilyBufferedEvent())162 {163 list<CBufferOut*> buffList;164 if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call165 {166 list<CBufferOut*>::iterator it, itBuffer;167 168 for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++)169 (*itBuffer)->put((char*)(*it)->start(), (*it)->count());170 171 info(100)<<"DEBUG : temporaly event sent "<<endl ;172 checkBuffers(tmpBufferedEvent.ranks);173 174 tmpBufferedEvent.clear();175 176 couldSendTmpBufferedEvent = true;177 }178 }179 180 return couldSendTmpBufferedEvent;181 162 } 182 163 … … 205 186 * it is explicitly requested to be non-blocking. 206 187 * 188 * 189 * \param [in] timeLine time line of the event which will be sent to servers 207 190 * \param [in] serverList list of rank of connected server 208 191 * \param [in] sizeList size of message corresponding to each connection … … 211 194 * \return whether the already allocated buffers could be used 212 195 */ 213 bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers,196 bool CContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 214 197 bool nonBlocking /*= false*/) 215 198 { … … 236 219 areBuffersFree = true; 237 220 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 221 { 238 222 areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 223 } 239 224 240 225 if (!areBuffersFree) 241 226 { 227 for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 242 228 checkBuffers(); 243 if (CServer::serverLevel == 0) 244 context->server->listen(); 245 229 if (CServer::serverLevel == 0) context->server->listen(); 246 230 else if (CServer::serverLevel == 1) 247 231 { 248 232 context->server->listen(); 249 for (int i = 0; i < context->serverPrimServer.size(); ++i) 250 context->serverPrimServer[i]->listen(); 233 for (int i = 0; i < context->serverPrimServer.size(); ++i) context->serverPrimServer[i]->listen(); 251 234 CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 252 235 } 253 236 254 else if (CServer::serverLevel == 2) 255 context->server->listen(); 237 else if (CServer::serverLevel == 2) context->server->listen(); 256 238 257 239 } 258 240 } while (!areBuffersFree && !nonBlocking); 259 260 241 CTimer::get("Blocking time").suspend(); 261 242 … … 263 244 { 264 245 for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 265 retBuffers.push_back((*itBuffer)->getBuffer(*itSize)); 266 } 267 246 retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 247 } 268 248 return areBuffersFree; 269 249 } … … 281 261 maxEventSizes[rank] = CXios::minBufferSize; 282 262 } 283 CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank], maxBufferedEvents); 263 264 vector<MPI_Win> Wins(2,MPI_WIN_NULL) ; 265 if (!isAttachedModeEnabled()) Wins=windows[rank] ; 266 267 CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, Wins, clientRank, rank, mapBufferSize_[rank], maxEventSizes[rank]); 284 268 // Notify the server 285 CBufferOut* bufOut = buffer->getBuffer(sizeof(StdSize)); 286 bufOut->put(mapBufferSize_[rank]); // Stupid C++ 287 buffer->checkBuffer(); 269 CBufferOut* bufOut = buffer->getBuffer(0, 3*sizeof(MPI_Aint)); 270 MPI_Aint sendBuff[3] ; 271 sendBuff[0]=mapBufferSize_[rank]; // Stupid C++ 272 sendBuff[1]=buffers[rank]->getWinAddress(0); 273 sendBuff[2]=buffers[rank]->getWinAddress(1); 274 info(100)<<"CContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl; 275 bufOut->put(sendBuff, 3); // Stupid C++ 276 buffer->checkBuffer(true); 277 278 /* 279 if (!isAttachedModeEnabled()) // create windows only in server mode 280 { 281 MPI_Comm OneSidedInterComm, oneSidedComm ; 282 MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &OneSidedInterComm ); 283 MPI_Intercomm_merge(OneSidedInterComm,false,&oneSidedComm); 284 buffer->createWindows(oneSidedComm) ; 285 } 286 */ 288 287 } 289 288 … … 297 296 bool pending = false; 298 297 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 299 pending |= itBuff->second->checkBuffer( );298 pending |= itBuff->second->checkBuffer(!pureOneSided); 300 299 return pending; 301 300 } … … 307 306 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 308 307 { 309 308 delete itBuff->second; 310 309 } 311 310 buffers.clear(); 312 } 313 311 312 /* don't know when release windows 313 314 if (!isAttachedModeEnabled()) 315 { 316 for(int rank=0; rank<serverSize; rank++) 317 { 318 MPI_Win_free(&windows[rank][0]); 319 MPI_Win_free(&windows[rank][1]); 320 } 321 } 322 } 323 */ 324 325 /*! 326 Lock the buffers for one sided communications 327 \param [in] ranks list rank of server to which client connects to 328 */ 329 void CContextClient::lockBuffers(list<int>& ranks) 330 { 331 list<int>::iterator it; 332 for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer(); 333 } 334 335 /*! 336 Unlock the buffers for one sided communications 337 \param [in] ranks list rank of server to which client connects to 338 */ 339 void CContextClient::unlockBuffers(list<int>& ranks) 340 { 341 list<int>::iterator it; 342 for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer(); 343 } 344 314 345 /*! 315 346 Verify state of buffers corresponding to a connection … … 321 352 list<int>::iterator it; 322 353 bool pending = false; 323 for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer( );354 for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided); 324 355 return pending; 325 356 } … … 335 366 mapBufferSize_ = mapSize; 336 367 maxEventSizes = maxEventSize; 337 338 // Compute the maximum number of events that can be safely buffered.339 double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max();340 for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it)341 {342 double ratio = double(it->second) / maxEventSizes[it->first];343 if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio;344 }345 MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm);346 347 if (minBufferSizeEventSizeRatio < 1.0)348 {349 ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)",350 << "The buffer sizes and the maximum events sizes are incoherent.");351 }352 else if (minBufferSizeEventSizeRatio == std::numeric_limits<double>::max())353 minBufferSizeEventSizeRatio = 1.0; // In this case, maxBufferedEvents will never be used but we want to avoid any floating point exception354 355 maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server356 + size_t(minBufferSizeEventSizeRatio) // one local buffer can always be fully used357 + 1; // the other local buffer might contain only one event358 368 } 359 369 … … 410 420 { 411 421 map<int,CClientBuffer*>::iterator itBuff; 422 std::list<int>::iterator ItServerLeader; 423 412 424 bool stop = false; 413 425 414 CTimer::get("Blocking time").resume();415 while (hasTemporarilyBufferedEvent())416 {417 checkBuffers();418 sendTemporarilyBufferedEvent();419 }420 CTimer::get("Blocking time").suspend();421 426 int* nbServerConnectionLocal = new int[serverSize] ; 427 int* nbServerConnectionGlobal = new int[serverSize] ; 428 for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ; 429 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) nbServerConnectionLocal[itBuff->first]=1 ; 430 for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++) nbServerConnectionLocal[*ItServerLeader]=1 ; 431 432 MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm); 433 422 434 CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 435 CMessage msg; 436 437 for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ; 438 sendEvent(event); 439 440 delete[] nbServerConnectionLocal ; 441 delete[] nbServerConnectionGlobal ; 442 /* 423 443 if (isServerLeader()) 424 444 { … … 433 453 } 434 454 else sendEvent(event); 455 */ 435 456 436 457 CTimer::get("Blocking time").resume(); 437 // while (!stop) 438 { 439 checkBuffers(); 440 if (hasTemporarilyBufferedEvent()) 441 sendTemporarilyBufferedEvent(); 442 443 stop = true; 444 // for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 445 } 458 checkBuffers(); 446 459 CTimer::get("Blocking time").suspend(); 447 460 … … 472 485 return pending; 473 486 } 474 487 488 bool CContextClient::isNotifiedFinalized(void) 489 { 490 if (isAttachedModeEnabled()) return true ; 491 492 bool finalized = true; 493 map<int,CClientBuffer*>::iterator itBuff; 494 for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 495 finalized &= itBuff->second->isNotifiedFinalized(); 496 return finalized; 497 } 475 498 476 499 } -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_client.hpp
r1639 r1757 31 31 // Send event to server 32 32 void sendEvent(CEventClient& event); 33 bool sendTemporarilyBufferedEvent();34 33 void waitEvent(list<int>& ranks); 35 34 36 35 // Functions to set/get buffers 37 bool getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false);36 bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 38 37 void newBuffer(int rank); 39 38 bool checkBuffers(list<int>& ranks); … … 48 47 49 48 bool isAttachedModeEnabled() const; 50 bool hasTemporarilyBufferedEvent() const { return !tmpBufferedEvent.isEmpty(); };51 49 52 50 static void computeLeader(int clientRank, int clientSize, int serverSize, … … 56 54 // Close and finalize context client 57 55 // void closeContext(void); Never been implemented. 56 bool isNotifiedFinalized(void) ; 58 57 void finalize(void); 59 58 … … 71 70 int serverSize; //!< Size of server group 72 71 73 MPI_Comm interComm; //!< Communicator of server group 72 MPI_Comm interComm; //!< Communicator of server group (interCommunicator) 73 74 MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 74 75 75 76 MPI_Comm intraComm; //!< Communicator of client group 76 77 78 MPI_Comm commSelf; //!< Communicator of the client alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication 79 77 80 map<int,CClientBuffer*> buffers; //!< Buffers for connection to servers 78 81 82 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 83 79 84 private: 85 void lockBuffers(list<int>& ranks) ; 86 void unlockBuffers(list<int>& ranks) ; 87 80 88 //! Mapping of server and buffer size for each connection to server 81 89 std::map<int,StdSize> mapBufferSize_; … … 84 92 //! Maximum number of events that can be buffered 85 93 StdSize maxBufferedEvents; 86 87 struct {88 std::list<int> ranks, sizes;89 std::list<CBufferOut*> buffers;90 91 bool isEmpty() const { return ranks.empty(); };92 void clear() {93 ranks.clear();94 sizes.clear();95 96 for (std::list<CBufferOut*>::iterator it = buffers.begin(); it != buffers.end(); it++)97 delete *it;98 99 buffers.clear();100 };101 } tmpBufferedEvent; //! Event temporarily buffered (used only on the server)102 94 103 95 //! Context for server (Only used in attached mode) … … 110 102 std::list<int> ranksServerNotLeader; 111 103 104 std::vector<std::vector<MPI_Win> >windows ; //! one sided mpi windows to expose client buffers to servers == windows[nbServers][2] 105 106 112 107 }; 113 108 } -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.cpp
r1639 r1757 33 33 int flag; 34 34 MPI_Comm_test_inter(interComm,&flag); 35 36 if (flag) attachedMode=false ; 37 else attachedMode=true ; 38 35 39 if (flag) MPI_Comm_remote_size(interComm,&commSize); 36 40 else MPI_Comm_size(interComm,&commSize); 37 41 38 currentTimeLine=0; 42 43 currentTimeLine=1; 39 44 scheduled=false; 40 45 finished=false; … … 44 49 else 45 50 hashId=hashString(context->getId()); 46 } 47 51 52 if (!isAttachedModeEnabled()) 53 { 54 MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 55 // create windows for one sided comm 56 int interCommMergedRank; 57 MPI_Comm winComm ; 58 MPI_Comm_rank(intraComm, &interCommMergedRank); 59 windows.resize(2) ; 60 for(int rank=commSize; rank<commSize+intraCommSize; rank++) 61 { 62 if (rank==commSize+interCommMergedRank) 63 { 64 MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 65 int myRank ; 66 MPI_Comm_rank(winComm,&myRank); 67 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]); 68 MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]); 69 } 70 else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 71 MPI_Comm_free(&winComm) ; 72 } 73 } 74 else 75 { 76 windows.resize(2) ; 77 windows[0]=MPI_WIN_NULL ; 78 windows[1]=MPI_WIN_NULL ; 79 } 80 81 82 83 MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ; 84 itLastTimeLine=lastTimeLine.begin() ; 85 86 pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 87 if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 88 89 } 90 91 //! Attached mode is used ? 92 //! \return true if attached mode is used, false otherwise 93 bool CContextServer::isAttachedModeEnabled() const 94 { 95 return attachedMode ; 96 } 97 48 98 void CContextServer::setPendingEvent(void) 49 99 { … … 65 115 listen(); 66 116 checkPendingRequest(); 67 if (enableEventsProcessing) 68 processEvents(); 117 if (enableEventsProcessing) processEvents(); 69 118 return finished; 70 119 } … … 117 166 if (it==buffers.end()) // Receive the buffer size and allocate the buffer 118 167 { 119 StdSize buffSize = 0; 120 MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); 168 MPI_Aint recvBuff[3] ; 169 MPI_Recv(recvBuff, 3, MPI_AINT, rank, 20, interComm, &status); 170 StdSize buffSize = recvBuff[0]; 171 vector<MPI_Aint> winAdress(2) ; 172 winAdress[0]=recvBuff[1] ; winAdress[1]=recvBuff[2] ; 121 173 mapBufferSize_.insert(std::make_pair(rank, buffSize)); 122 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 174 it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 175 /* 176 if (!isAttachedModeEnabled()) 177 { 178 MPI_Comm OneSidedInterComm, oneSidedComm ; 179 MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0, &OneSidedInterComm ); 180 MPI_Intercomm_merge(OneSidedInterComm,true,&oneSidedComm); 181 buffers[rank]->createWindows(oneSidedComm) ; 182 } 183 */ 184 lastTimeLine[rank]=0 ; 185 itLastTimeLine=lastTimeLine.begin() ; 186 123 187 return true; 124 188 } … … 157 221 if (flag==true) 158 222 { 223 buffers[rank]->updateCurrentWindows() ; 159 224 recvRequest.push_back(rank); 160 225 MPI_Get_count(&status,MPI_CHAR,&count); … … 170 235 } 171 236 237 void CContextServer::getBufferFromClient(size_t timeLine) 238 { 239 if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 240 { 241 int rank ; 242 char *buffer ; 243 size_t count ; 244 245 if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 246 for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 247 { 248 rank=itLastTimeLine->first ; 249 if (itLastTimeLine->second < timeLine && pendingRequest.count(rank)==0) 250 { 251 if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) 252 { 253 processRequest(rank, buffer, count); 254 break ; 255 } 256 } 257 } 258 } 259 } 260 261 172 262 void CContextServer::processRequest(int rank, char* buff,int count) 173 263 { … … 176 266 char* startBuffer,endBuffer; 177 267 int size, offset; 178 size_t timeLine ;268 size_t timeLine=0; 179 269 map<size_t,CEventServer*>::iterator it; 180 270 271 181 272 CTimer::get("Process request").resume(); 182 273 while(count>0) … … 185 276 CBufferIn newBuffer(startBuffer,buffer.remain()); 186 277 newBuffer>>size>>timeLine; 187 188 278 it=events.find(timeLine); 189 279 if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first; … … 193 283 count=buffer.remain(); 194 284 } 285 286 if (timeLine>0) lastTimeLine[rank]=timeLine ; 287 195 288 CTimer::get("Process request").suspend(); 196 289 } … … 230 323 } 231 324 } 232 } 325 else getBufferFromClient(currentTimeLine) ; 326 } 327 else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line 233 328 } 234 329 … … 237 332 map<int,CServerBuffer*>::iterator it; 238 333 for(it=buffers.begin();it!=buffers.end();++it) delete it->second; 334 } 335 336 void CContextServer::releaseBuffers() 337 { 338 map<int,CServerBuffer*>::iterator it; 339 bool out ; 340 do 341 { 342 out=true ; 343 for(it=buffers.begin();it!=buffers.end();++it) 344 { 345 // out = out && it->second->freeWindows() ; 346 347 } 348 } while (! out) ; 349 } 350 351 void CContextServer::notifyClientsFinalize(void) 352 { 353 for(auto it=buffers.begin();it!=buffers.end();++it) 354 { 355 it->second->notifyClientFinalize() ; 356 } 239 357 } 240 358 … … 254 372 finished=true; 255 373 info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl; 374 // releaseBuffers() ; 375 notifyClientsFinalize() ; 256 376 context->finalize(); 377 378 /* don't know where release windows 379 MPI_Win_free(&windows[0]) ; 380 MPI_Win_free(&windows[1]) ; 381 */ 257 382 std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 258 383 iteMap = mapBufferSize_.end(), itMap; -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.hpp
r1639 r1757 19 19 bool listenPendingRequest(MPI_Status& status) ; 20 20 void checkPendingRequest(void) ; 21 void getBufferFromClient(size_t timeLine) ; 21 22 void processRequest(int rank, char* buff,int count) ; 22 23 void processEvents(void) ; … … 25 26 void setPendingEvent(void) ; 26 27 bool hasPendingEvent(void) ; 27 28 bool isAttachedModeEnabled() const; 29 void releaseBuffers(void) ; 30 void notifyClientsFinalize(void) ; 31 28 32 MPI_Comm intraComm ; 29 33 int intraCommSize ; … … 33 37 int commSize ; 34 38 39 MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 40 41 MPI_Comm commSelf; //!< Communicator of the server alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication 42 35 43 map<int,CServerBuffer*> buffers ; 44 map<int,size_t> lastTimeLine ; //!< last event time line for a processed request 45 map<int,size_t>::iterator itLastTimeLine ; //!< iterator on lastTimeLine 36 46 map<int,MPI_Request> pendingRequest ; 37 47 map<int,char*> bufferRequest ; … … 44 54 bool pendingEvent ; 45 55 bool scheduled ; /*!< event of current timeline is alreading scheduled ? */ 56 bool attachedMode ; //! true if attached mode is enabled otherwise false 57 bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 58 46 59 size_t hashId ; 47 60 … … 50 63 private: 51 64 std::map<int, StdSize> mapBufferSize_; 65 vector<MPI_Win> windows ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side. 66 52 67 } ; 53 68 -
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/node/context.cpp
r1639 r1757 429 429 { 430 430 client->checkBuffers(); 431 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 432 if (hasTmpBufferedEvent) 433 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 434 // Don't process events if there is a temporarily buffered event 435 return server->eventLoop(!hasTmpBufferedEvent || !enableEventsProcessing); 431 return server->eventLoop(true); 436 432 } 437 433 else if (CServer::serverLevel == 1) 438 434 { 439 if (!finalized) 440 client->checkBuffers(); 435 if (!finalized) client->checkBuffers(); 441 436 bool serverFinished = true; 442 if (!finalized) 443 serverFinished = server->eventLoop(enableEventsProcessing); 437 if (!finalized) serverFinished = server->eventLoop(enableEventsProcessing); 444 438 bool serverPrimFinished = true; 445 439 for (int i = 0; i < clientPrimServer.size(); ++i) 446 440 { 447 if (!finalized) 448 clientPrimServer[i]->checkBuffers(); 449 if (!finalized) 450 serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing); 441 if (!finalized) clientPrimServer[i]->checkBuffers(); 442 if (!finalized) serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing); 451 443 } 452 444 return ( serverFinished && serverPrimFinished); … … 484 476 ++countChildCtx_; 485 477 478 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 486 479 client->finalize(); 487 while (client->havePendingRequests()) 488 client->checkBuffers(); 489 480 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 481 while (client->havePendingRequests()) client->checkBuffers(); 482 483 info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 490 484 while (!server->hasFinished()) 491 485 server->eventLoop(); 486 info(100)<<"DEBUG: context "<<getId()<<" server has finished"<<endl ; 487 488 bool notifiedFinalized=false ; 489 do 490 { 491 notifiedFinalized=client->isNotifiedFinalized() ; 492 } while (!notifiedFinalized) ; 493 client->releaseBuffers(); 492 494 493 495 if (hasServer) // Mode attache … … 499 501 500 502 //! Deallocate client buffers 501 client->releaseBuffers();502 503 // client->releaseBuffers(); 504 info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ; 503 505 //! Free internally allocated communicators 504 506 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) … … 515 517 if (countChildCtx_ == 0) 516 518 for (int i = 0; i < clientPrimServer.size(); ++i) 519 { 517 520 clientPrimServer[i]->finalize(); 521 bool bufferReleased; 522 do 523 { 524 clientPrimServer[i]->checkBuffers(); 525 bufferReleased = !clientPrimServer[i]->havePendingRequests(); 526 } while (!bufferReleased); 527 528 bool notifiedFinalized=false ; 529 do 530 { 531 // clientPrimServer[i]->checkBuffers(); 532 notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ; 533 } while (!notifiedFinalized) ; 534 clientPrimServer[i]->releaseBuffers(); 535 } 536 518 537 519 538 // (Last) context finalized message received … … 521 540 { 522 541 // Blocking send of context finalize message to its client (e.g. primary server or model) 523 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize <<"<<endl ;542 info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 524 543 client->finalize(); 544 info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 525 545 bool bufferReleased; 526 546 do … … 529 549 bufferReleased = !client->havePendingRequests(); 530 550 } while (!bufferReleased); 551 552 bool notifiedFinalized=false ; 553 do 554 { 555 // client->checkBuffers(); 556 notifiedFinalized=client->isNotifiedFinalized() ; 557 } while (!notifiedFinalized) ; 558 client->releaseBuffers(); 559 531 560 finalized = true; 532 561 info(100)<<"DEBUG: context "<<getId()<<" bufferRelease OK"<<endl ; 562 533 563 closeAllFile(); // Just move to here to make sure that server-level 1 can close files 534 564 if (hasServer && !hasClient) … … 539 569 540 570 //! Deallocate client buffers 541 client->releaseBuffers(); 571 // client->releaseBuffers(); 572 info(100)<<"DEBUG: context "<<getId()<<" client release"<<endl ; 573 574 /* 542 575 for (int i = 0; i < clientPrimServer.size(); ++i) 543 576 clientPrimServer[i]->releaseBuffers(); 544 577 */ 545 578 //! Free internally allocated communicators 546 579 for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it)
Note: See TracChangeset
for help on using the changeset viewer.