Changeset 1547 for XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.cpp
- Timestamp:
- 06/20/18 09:09:23 (6 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.cpp
r1227 r1547 12 12 size_t CClientBuffer::maxRequestSize = 0; 13 13 14 CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize , StdSize maxBufferedEvents)14 CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 15 15 : interComm(interComm) 16 16 , serverRank(serverRank) … … 20 20 , current(0) 21 21 , count(0) 22 , bufferedEvents(0)23 , maxBufferedEvents(maxBufferedEvents)24 22 , pending(false) 25 { 26 buffer[0] = new char[bufferSize]; // transform it with MPI_ALLOC_MEM later 27 buffer[1] = new char[bufferSize]; 23 , hasWindows(false) 24 { 25 MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 26 MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 27 28 29 buffer[0] = bufferHeader[0]+headerSize ; 30 buffer[1] = bufferHeader[1]+headerSize ; 31 firstTimeLine[0]=(size_t*)bufferHeader[0] ; 32 firstTimeLine[1]=(size_t*)bufferHeader[1] ; 33 bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 34 bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 35 control[0]=(size_t*)bufferHeader[0] +2 ; 36 control[1]=(size_t*)bufferHeader[1] +2 ; 37 38 *firstTimeLine[0]=0 ; 39 *firstTimeLine[1]=0 ; 40 *bufferCount[0]=0 ; 41 *bufferCount[1]=0 ; 42 *control[0]=0 ; 43 *control[1]=0 ; 44 winState[0]=false ; 45 winState[1]=false ; 28 46 retBuffer = new CBufferOut(buffer[current], bufferSize); 29 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << " with a maximum of " << maxBufferedEvents << " buffered events" <<endl;47 info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 30 48 } 31 49 32 50 CClientBuffer::~CClientBuffer() 33 51 { 34 delete [] buffer[0]; 35 delete [] buffer[1]; 36 delete retBuffer; 52 freeWindows() ; 53 MPI_Free_mem(bufferHeader[0]) ; 54 MPI_Free_mem(bufferHeader[1]) ; 55 delete retBuffer; 56 } 57 58 void CClientBuffer::createWindows(MPI_Comm oneSidedComm) 59 { 60 MPI_Barrier(oneSidedComm) ; 61 MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 62 MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 63 64 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 65 *firstTimeLine[0]=0 ; 66 *bufferCount[0]=0 ; 67 *control[0]=0 ; 68 MPI_Win_unlock(0, windows[0]) ; 69 70 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 71 *firstTimeLine[1]=0 ; 72 *bufferCount[1]=0 ; 73 *control[1]=0 ; 74 MPI_Win_unlock(0, windows[1]) ; 75 winState[0]=false ; 76 winState[1]=false ; 77 MPI_Barrier(oneSidedComm) ; 78 hasWindows=true ; 79 } 80 81 void CClientBuffer::freeWindows() 82 { 83 if (hasWindows) 84 { 85 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 86 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 87 *control[0]=2 ; 88 *control[1]=2 ; 89 MPI_Win_unlock(0, windows[1]) ; 90 MPI_Win_unlock(0, windows[0]) ; 91 92 MPI_Win_free(&windows[0]) ; 93 MPI_Win_free(&windows[1]) ; 94 hasWindows=false ; 95 } 96 } 97 98 void CClientBuffer::lockBuffer(void) 99 { 100 if (hasWindows) 101 { 102 MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[current]) ; 103 winState[current]=true ; 104 } 105 } 106 107 void CClientBuffer::unlockBuffer(void) 108 { 109 if (hasWindows) 110 { 111 MPI_Win_unlock(0, windows[current]) ; 112 winState[current]=false ; 113 } 37 114 } 38 115 … … 44 121 bool CClientBuffer::isBufferFree(StdSize size) 45 122 { 123 bool loop=true ; 124 while (loop) 125 { 126 lockBuffer(); 127 if (*control[current]==0) loop=false ; // attemp to read from server ? 128 else unlockBuffer() ; 129 } 130 46 131 if (size > bufferSize) 47 132 ERROR("bool CClientBuffer::isBufferFree(StdSize size)", … … 59 144 } 60 145 61 62 return (size <= remain() && bufferedEvents < maxBufferedEvents);63 } 64 65 66 CBufferOut* CClientBuffer::getBuffer( StdSize size)146 count=*bufferCount[current] ; 147 return (size <= remain()); 148 } 149 150 151 CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size) 67 152 { 68 153 if (size <= remain()) 69 154 { 155 info(100)<<"count "<<count<<" bufferCount[current] "<<*bufferCount[current]<<endl ; 70 156 retBuffer->realloc(buffer[current] + count, size); 71 157 count += size; 72 bufferedEvents++; 158 if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 159 *bufferCount[current]=count ; 73 160 return retBuffer; 74 161 } … … 81 168 } 82 169 83 bool CClientBuffer::checkBuffer( void)170 bool CClientBuffer::checkBuffer(bool send) 84 171 { 85 172 MPI_Status status; … … 96 183 if (!pending) 97 184 { 185 if (!send) return false ; 98 186 if (count > 0) 99 187 { 100 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 101 pending = true; 102 if (current == 1) current = 0; 103 else current = 1; 104 count = 0; 105 bufferedEvents = 0; 188 lockBuffer() ; 189 if (*control[current]==0 && bufferCount[current] > 0) 190 { 191 MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 192 pending = true; 193 *control[current]=0 ; 194 *firstTimeLine[current]=0 ; 195 *bufferCount[current]=0 ; 196 197 unlockBuffer() ; 198 199 if (current == 1) current = 0; 200 else current = 1; 201 count = 0; 202 } 203 else unlockBuffer() ; 106 204 } 107 205 } … … 112 210 bool CClientBuffer::hasPendingRequest(void) 113 211 { 212 213 lockBuffer() ; 214 count=*bufferCount[current] ; 215 unlockBuffer() ; 216 114 217 return (pending || count > 0); 115 218 } 219 220 116 221 }
Note: See TracChangeset
for help on using the changeset viewer.