Ignore:
Timestamp:
06/20/18 09:09:23 (6 years ago)
Author:
ymipsl
Message:

New communication protocol between clients and servers, using hybrid mode of p2p mixt with one_sided communication in order to avoid dead-locking. The constraint of the maximum number of event that can be bufferized on client side is released.

Dev branch is created to be tested before merging.

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.cpp

    r1227 r1547  
    1212  size_t CClientBuffer::maxRequestSize = 0; 
    1313 
    14   CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize, StdSize maxBufferedEvents) 
     14  CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 
    1515    : interComm(interComm) 
    1616    , serverRank(serverRank) 
     
    2020    , current(0) 
    2121    , count(0) 
    22     , bufferedEvents(0) 
    23     , maxBufferedEvents(maxBufferedEvents) 
    2422    , pending(false) 
    25   { 
    26     buffer[0] = new char[bufferSize]; // transform it with MPI_ALLOC_MEM later 
    27     buffer[1] = new char[bufferSize]; 
     23    , hasWindows(false)  
     24  { 
     25      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 
     26      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 
     27 
     28 
     29    buffer[0] = bufferHeader[0]+headerSize ; 
     30    buffer[1] = bufferHeader[1]+headerSize ; 
     31    firstTimeLine[0]=(size_t*)bufferHeader[0] ; 
     32    firstTimeLine[1]=(size_t*)bufferHeader[1] ; 
     33    bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 
     34    bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 
     35    control[0]=(size_t*)bufferHeader[0] +2 ; 
     36    control[1]=(size_t*)bufferHeader[1] +2 ; 
     37 
     38    *firstTimeLine[0]=0 ; 
     39    *firstTimeLine[1]=0 ; 
     40    *bufferCount[0]=0 ; 
     41    *bufferCount[1]=0 ; 
     42    *control[0]=0 ; 
     43    *control[1]=0 ; 
     44    winState[0]=false ; 
     45    winState[1]=false ; 
    2846    retBuffer = new CBufferOut(buffer[current], bufferSize); 
    29     info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << " with a maximum of " << maxBufferedEvents << " buffered events" << endl; 
     47    info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 
    3048  } 
    3149 
    3250  CClientBuffer::~CClientBuffer() 
    3351  { 
    34    delete [] buffer[0]; 
    35    delete [] buffer[1]; 
    36    delete retBuffer; 
     52     freeWindows() ; 
     53     MPI_Free_mem(bufferHeader[0]) ; 
     54     MPI_Free_mem(bufferHeader[1]) ; 
     55     delete retBuffer; 
     56  } 
     57 
     58  void CClientBuffer::createWindows(MPI_Comm oneSidedComm) 
     59  { 
     60    MPI_Barrier(oneSidedComm) ; 
     61    MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 
     62    MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 
     63 
     64    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 
     65    *firstTimeLine[0]=0 ; 
     66    *bufferCount[0]=0 ; 
     67    *control[0]=0 ; 
     68    MPI_Win_unlock(0, windows[0]) ; 
     69 
     70    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 
     71    *firstTimeLine[1]=0 ; 
     72    *bufferCount[1]=0 ; 
     73    *control[1]=0 ; 
     74    MPI_Win_unlock(0, windows[1]) ; 
     75    winState[0]=false ; 
     76    winState[1]=false ; 
     77    MPI_Barrier(oneSidedComm) ; 
     78    hasWindows=true ; 
     79  } 
     80 
     81  void CClientBuffer::freeWindows() 
     82  { 
     83    if (hasWindows) 
     84    { 
     85      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 
     86      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 
     87      *control[0]=2 ; 
     88      *control[1]=2 ; 
     89      MPI_Win_unlock(0, windows[1]) ; 
     90      MPI_Win_unlock(0, windows[0]) ; 
     91       
     92      MPI_Win_free(&windows[0]) ; 
     93      MPI_Win_free(&windows[1]) ; 
     94      hasWindows=false ; 
     95    } 
     96  } 
     97  
     98  void CClientBuffer::lockBuffer(void) 
     99  { 
     100    if (hasWindows) 
     101    { 
     102      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[current]) ; 
     103      winState[current]=true ; 
     104    } 
     105  } 
     106 
     107  void CClientBuffer::unlockBuffer(void) 
     108  { 
     109    if (hasWindows) 
     110    { 
     111      MPI_Win_unlock(0, windows[current]) ; 
     112      winState[current]=false ; 
     113    } 
    37114  } 
    38115 
     
    44121  bool CClientBuffer::isBufferFree(StdSize size) 
    45122  { 
     123    bool loop=true ; 
     124    while (loop)  
     125    { 
     126      lockBuffer(); 
     127      if (*control[current]==0) loop=false ; // attemp to read from server ? 
     128      else unlockBuffer() ; 
     129    } 
     130     
    46131    if (size > bufferSize) 
    47132      ERROR("bool CClientBuffer::isBufferFree(StdSize size)", 
     
    59144    } 
    60145 
    61  
    62     return (size <= remain() && bufferedEvents < maxBufferedEvents); 
    63   } 
    64  
    65  
    66   CBufferOut* CClientBuffer::getBuffer(StdSize size) 
     146      count=*bufferCount[current] ; 
     147      return (size <= remain()); 
     148  } 
     149 
     150 
     151  CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size) 
    67152  { 
    68153    if (size <= remain()) 
    69154    { 
     155      info(100)<<"count "<<count<<"   bufferCount[current]  "<<*bufferCount[current]<<endl ; 
    70156      retBuffer->realloc(buffer[current] + count, size); 
    71157      count += size; 
    72       bufferedEvents++; 
     158      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 
     159      *bufferCount[current]=count ; 
    73160      return retBuffer; 
    74161    } 
     
    81168  } 
    82169 
    83   bool CClientBuffer::checkBuffer(void) 
     170  bool CClientBuffer::checkBuffer(bool send) 
    84171  { 
    85172    MPI_Status status; 
     
    96183    if (!pending) 
    97184    { 
     185      if (!send) return false ; 
    98186      if (count > 0) 
    99187      { 
    100         MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
    101         pending = true; 
    102         if (current == 1) current = 0; 
    103         else current = 1; 
    104         count = 0; 
    105         bufferedEvents = 0; 
     188        lockBuffer() ; 
     189        if (*control[current]==0 && bufferCount[current] > 0) 
     190        { 
     191          MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
     192          pending = true; 
     193          *control[current]=0 ; 
     194          *firstTimeLine[current]=0 ; 
     195          *bufferCount[current]=0 ; 
     196 
     197           unlockBuffer() ; 
     198 
     199          if (current == 1) current = 0; 
     200          else current = 1; 
     201          count = 0; 
     202        } 
     203        else unlockBuffer() ; 
    106204      } 
    107205    } 
     
    112210  bool CClientBuffer::hasPendingRequest(void) 
    113211  { 
     212    
     213    lockBuffer() ; 
     214    count=*bufferCount[current] ; 
     215    unlockBuffer() ; 
     216 
    114217    return (pending || count > 0); 
    115218  } 
     219 
     220 
    116221} 
Note: See TracChangeset for help on using the changeset viewer.