Changeset 1757


Ignore:
Timestamp:
10/18/19 14:55:57 (5 years ago)
Author:
ymipsl
Message:

Implement one sided communication in client/server protocol to avoid dead-lock when some buffer are full.

YM

Location:
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.cpp

    r1639 r1757  
    1212  size_t CClientBuffer::maxRequestSize = 0; 
    1313 
    14   CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize, StdSize maxBufferedEvents) 
     14  CClientBuffer::CClientBuffer(MPI_Comm interComm, vector<MPI_Win>& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 
    1515    : interComm(interComm) 
     16    , clientRank_(clientRank) 
    1617    , serverRank(serverRank) 
    1718    , bufferSize(bufferSize) 
     
    2021    , current(0) 
    2122    , count(0) 
    22     , bufferedEvents(0) 
    23     , maxBufferedEvents(maxBufferedEvents) 
    2423    , pending(false) 
    25   { 
    26     buffer[0] = new char[bufferSize]; // transform it with MPI_ALLOC_MEM later 
    27     buffer[1] = new char[bufferSize]; 
     24    , hasWindows(false)  
     25    , windows_(windows) 
     26  { 
     27    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 
     28    else hasWindows=true ; 
     29 
     30      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 
     31      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 
     32      buffer[0] = bufferHeader[0]+headerSize ; 
     33      buffer[1] = bufferHeader[1]+headerSize ; 
     34      firstTimeLine[0]=(size_t*)bufferHeader[0] ; 
     35      firstTimeLine[1]=(size_t*)bufferHeader[1] ; 
     36      bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 
     37      bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 
     38      control[0]=(size_t*)bufferHeader[0] +2 ; 
     39      control[1]=(size_t*)bufferHeader[1] +2 ; 
     40      finalize[0]=(size_t*)bufferHeader[0] +3 ; 
     41      finalize[1]=(size_t*)bufferHeader[1] +3 ; 
     42 
     43      *firstTimeLine[0]=0 ; 
     44      *firstTimeLine[1]=0 ; 
     45      *bufferCount[0]=0 ; 
     46      *bufferCount[1]=0 ; 
     47      *control[0]=0 ; 
     48      *control[1]=0 ; 
     49      *finalize[0]=0 ; 
     50      *finalize[1]=0 ; 
     51      winState[0]=false ; 
     52      winState[1]=false ; 
     53 
     54 
     55    if (hasWindows) 
     56    {   
     57     
     58      MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ; 
     59      MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ; 
     60     
     61      MPI_Group group ; 
     62      int groupSize,groupRank ; 
     63      MPI_Win_get_group(windows_[0], &group) ; 
     64      MPI_Group_size(group, &groupSize) ; 
     65      MPI_Group_rank(group, &groupRank) ; 
     66      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
     67 
     68      MPI_Win_get_group(windows_[1], &group) ; 
     69      MPI_Group_size(group, &groupSize) ; 
     70      MPI_Group_rank(group, &groupRank) ; 
     71      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
     72 
     73      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
     74      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
     75 
     76      MPI_Win_unlock(clientRank_, windows_[1]) ; 
     77      MPI_Win_unlock(clientRank_, windows_[0]) ; 
     78    }  
    2879    retBuffer = new CBufferOut(buffer[current], bufferSize); 
    29     info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << " with a maximum of " << maxBufferedEvents << " buffered events" << endl; 
     80    info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 
     81  } 
     82 
     83  MPI_Aint CClientBuffer::getWinAddress(int i) 
     84  { 
     85     MPI_Aint address ; 
     86      
     87     if (hasWindows) MPI_Get_address(bufferHeader[i], &address) ; 
     88     else address=0 ; 
     89 
     90     return address ; 
    3091  } 
    3192 
    3293  CClientBuffer::~CClientBuffer() 
    3394  { 
    34    delete [] buffer[0]; 
    35    delete [] buffer[1]; 
    36    delete retBuffer; 
     95     //freeWindows() ; 
     96     if (hasWindows) 
     97     { 
     98       MPI_Win_detach(windows_[0],bufferHeader[0]); 
     99       MPI_Win_detach(windows_[1],bufferHeader[1]); 
     100       MPI_Free_mem(bufferHeader[0]) ; 
     101       MPI_Free_mem(bufferHeader[1]) ; 
     102     } 
     103     delete retBuffer; 
     104  } 
     105 
     106/*  void CClientBuffer::createWindows(MPI_Comm oneSidedComm) 
     107  { 
     108    MPI_Barrier(oneSidedComm) ; 
     109    MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 
     110    MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 
     111 
     112    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 
     113    *firstTimeLine[0]=0 ; 
     114    *bufferCount[0]=0 ; 
     115    *control[0]=0 ; 
     116    MPI_Win_unlock(0, windows[0]) ; 
     117 
     118    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 
     119    *firstTimeLine[1]=0 ; 
     120    *bufferCount[1]=0 ; 
     121    *control[1]=0 ; 
     122    MPI_Win_unlock(0, windows[1]) ; 
     123    winState[0]=false ; 
     124    winState[1]=false ; 
     125    MPI_Barrier(oneSidedComm) ; 
     126    hasWindows=true ; 
     127  } 
     128*/ 
     129 
     130/*   
     131  void CClientBuffer::freeWindows() 
     132  { 
     133    if (hasWindows) 
     134    { 
     135      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[0]) ; 
     136      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[1]) ; 
     137      *control[0]=2 ; 
     138      *control[1]=2 ; 
     139      MPI_Win_unlock(0, windows_[1]) ; 
     140      MPI_Win_unlock(0, windows_[0]) ; 
     141       
     142      MPI_Win_free(&windows_[0]) ; 
     143      MPI_Win_free(&windows_[1]) ; 
     144      hasWindows=false ; 
     145    } 
     146  } 
     147*/  
     148  void CClientBuffer::lockBuffer(void) 
     149  { 
     150    if (hasWindows) 
     151    { 
     152   //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[current]) ; 
     153      long long int lock=1 ; 
     154      long long int zero=0, one=1 ; 
     155      
     156      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 
     157      
     158      while(lock!=0) 
     159      { 
     160        MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 
     161                             windows_[current]) ; 
     162        MPI_Win_flush(clientRank_, windows_[current]) ; 
     163      } 
     164 
     165//      info(100)<<"Buffer locked "<<&windows_<<"  "<<current<<endl ; 
     166      winState[current]=true ; 
     167    } 
     168  } 
     169 
     170  void CClientBuffer::unlockBuffer(void) 
     171  { 
     172    if (hasWindows) 
     173    { 
     174      long long int lock=1 ; 
     175      long long int zero=0, one=1 ; 
     176 
     177      MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 
     178                             windows_[current]) ; 
     179      MPI_Win_unlock(clientRank_, windows_[current]) ; 
     180 
     181 //     info(100)<<"Buffer unlocked "<<&windows_<<"  "<<current<<endl ; 
     182      winState[current]=false ; 
     183    } 
    37184  } 
    38185 
     
    44191  bool CClientBuffer::isBufferFree(StdSize size) 
    45192  { 
     193//    bool loop=true ; 
     194//    while (loop)  
     195//    { 
     196//      lockBuffer(); 
     197//      if (*control[current]==0) loop=false ; // attemp to read from server ? 
     198//      else unlockBuffer() ; 
     199//    } 
     200   
     201    lockBuffer(); 
    46202    if (size > bufferSize) 
    47203      ERROR("bool CClientBuffer::isBufferFree(StdSize size)", 
     
    59215    } 
    60216 
    61  
    62     return (size <= remain() && bufferedEvents < maxBufferedEvents); 
    63   } 
    64  
    65  
    66   CBufferOut* CClientBuffer::getBuffer(StdSize size) 
     217      count=*bufferCount[current] ; 
     218      return (size <= remain()); 
     219  } 
     220 
     221 
     222  CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size) 
    67223  { 
    68224    if (size <= remain()) 
     
    70226      retBuffer->realloc(buffer[current] + count, size); 
    71227      count += size; 
    72       bufferedEvents++; 
     228      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 
     229      *bufferCount[current]=count ; 
     230/*      info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 
     231              <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ; 
     232      if (!winState[current]) info(40)<<"CClientBuffer::getBuffer "<<" Windows Not Locked... "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 
     233              <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;*/ 
    73234      return retBuffer; 
    74235    } 
     
    81242  } 
    82243 
    83   bool CClientBuffer::checkBuffer(void) 
     244  void CClientBuffer::infoBuffer(void) 
     245  { 
     246       
     247      char checksum=0 ; 
     248      for(size_t i=0;i<*bufferCount[current];i++) checksum=checksum+buffer[current][i] ; 
     249  
     250      char checksumFirst=0 ; 
     251      for(size_t i=5; i<10 && i<*bufferCount[current] ;i++) checksumFirst=checksumFirst+buffer[current][i] ; 
     252  
     253      char checksumLast=0 ; 
     254      for(size_t i=(*bufferCount[current]<10)?0:*bufferCount[current]-10; i<*bufferCount[current] ; i++) checksumLast=checksumLast+buffer[current][i] ; 
     255  
     256      info(45)<<"CClientBuffer::infoBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current<<" WinState "<<winState[current] 
     257              <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<" checksum "<<(int)checksum<<" " 
     258              <<(int)buffer[current][0]<<" "<<(int)buffer[current][1]<<" "<<(int)buffer[current][2]<<" "<<(int)buffer[current][3]<<" "<<(int)buffer[current][4]<<" "<<(int)buffer[current][5]<<" " 
     259              <<(int)buffer[current][6]<<" "<<(int)buffer[current][7]<<" "<<(int)buffer[current][8]<<" "<<(int)buffer[current][9]<<" "<<(int)buffer[current][10]<<" "<<(int)buffer[current][11]<<endl ; 
     260 
     261  } 
     262 
     263  bool CClientBuffer::checkBuffer(bool send) 
    84264  { 
    85265    MPI_Status status; 
     
    96276    if (!pending) 
    97277    { 
     278      if (!send) return false ; 
    98279      if (count > 0) 
    99280      { 
    100         MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
    101         pending = true; 
    102         if (current == 1) current = 0; 
    103         else current = 1; 
    104         count = 0; 
    105         bufferedEvents = 0; 
     281        lockBuffer() ; 
     282 //       if (*control[current]==0 && bufferCount[current] > 0) 
     283        if (*bufferCount[current] > 0) 
     284        { 
     285          MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
     286          pending = true; 
     287//          *control[current]=0 ; 
     288          *firstTimeLine[current]=0 ; 
     289          *bufferCount[current]=0 ; 
     290 
     291           unlockBuffer() ; 
     292 
     293          if (current == 1) current = 0; 
     294          else current = 1; 
     295          count = 0; 
     296        } 
     297        else unlockBuffer() ; 
    106298      } 
    107299    } 
     
    112304  bool CClientBuffer::hasPendingRequest(void) 
    113305  { 
     306    
     307    lockBuffer() ; 
     308    count=*bufferCount[current] ; 
     309    unlockBuffer() ; 
     310 
    114311    return (pending || count > 0); 
    115312  } 
     313 
     314  bool CClientBuffer::isNotifiedFinalized(void) 
     315  { 
     316    
     317    bool ret ; 
     318    lockBuffer() ; 
     319    ret=*finalize[current] == 1 ? true : false ; 
     320    unlockBuffer() ; 
     321 
     322    return ret; 
     323  } 
     324 
    116325} 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.hpp

    r1639 r1757  
    1414      static size_t maxRequestSize; 
    1515 
    16       CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize, StdSize maxBufferedEvents); 
     16      CClientBuffer(MPI_Comm intercomm, vector<MPI_Win>& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize); 
    1717      ~CClientBuffer(); 
    18  
     18//      void createWindows(MPI_Comm oneSidedComm) ; 
     19      void freeWindows(void) ; 
     20      void lockBuffer(void) ; 
     21      void unlockBuffer(void) ; 
     22       
    1923      bool isBufferFree(StdSize size); 
    20       CBufferOut* getBuffer(StdSize size); 
    21       bool checkBuffer(void); 
     24      CBufferOut* getBuffer(size_t timeLine, StdSize size); 
     25      bool checkBuffer(bool send=false); 
    2226      bool hasPendingRequest(void); 
    2327      StdSize remain(void); 
    24  
     28      MPI_Aint getWinAddress(int numWindows) ; 
     29      void infoBuffer(void) ; 
     30      bool isNotifiedFinalized(void) ; 
    2531    private: 
    2632      char* buffer[2]; 
    27  
     33      char* bufferHeader[2]; 
     34      size_t* firstTimeLine[2] ; 
     35      size_t* bufferCount[2] ; 
     36      size_t* control[2] ; 
     37      size_t* finalize[2] ; 
     38      bool winState[2] ; 
    2839      int current; 
    2940 
    3041      StdSize count; 
    31       StdSize bufferedEvents; 
    3242      StdSize maxEventSize; 
    33       const StdSize maxBufferedEvents; 
    3443      const StdSize bufferSize; 
    3544      const StdSize estimatedMaxEventSize; 
     
    3746 
    3847      const int serverRank; 
     48      const int clientRank_; 
    3949      bool pending; 
    4050 
     
    4353      CBufferOut* retBuffer; 
    4454      const MPI_Comm interComm; 
     55      std::vector<MPI_Win> windows_ ; 
     56      bool hasWindows ; 
     57      static const int headerSize=4*sizeof(size_t); 
    4558  }; 
    4659} 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.cpp

    r885 r1757  
    77{ 
    88 
    9   CServerBuffer::CServerBuffer(StdSize buffSize) 
     9  CServerBuffer::CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize)  
     10  : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress) 
    1011  { 
    1112    size = 3 * buffSize; 
     
    1314    current = 1; 
    1415    end = size; 
     16    used=0 ; 
    1517    buffer = new char[size]; // use MPI_ALLOC_MEM later? 
     18    currentWindows=1 ; 
     19    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 
    1620  } 
    1721 
     
    2125  } 
    2226 
     27  void CServerBuffer::updateCurrentWindows(void) 
     28  { 
     29    if (currentWindows==0) currentWindows=1 ; 
     30    else currentWindows=0 ; 
     31  } 
     32 
     33/* 
     34  void CServerBuffer::createWindows(MPI_Comm oneSidedComm) 
     35  { 
     36    MPI_Barrier(oneSidedComm) ; 
     37    MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 
     38    MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 
     39    hasWindows=true ; 
     40    updateCurrentWindows() ; 
     41    MPI_Barrier(oneSidedComm) ; 
     42  } 
     43*/ 
     44 
     45/* 
     46  bool CServerBuffer::freeWindows() 
     47  { 
     48    if (hasWindows) 
     49    { 
     50      size_t header[3] ; 
     51      size_t& control=header[2] ; 
     52      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows_[0]) ; 
     53      MPI_Get(&control, 1, MPI_LONG_LONG_INT, windowsRank , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ; 
     54      MPI_Win_unlock(0,windows[0]) ; 
     55      if (control==2)  // ok for free windows 
     56      { 
     57        MPI_Win_free( &(windows[0])) ; 
     58        MPI_Win_free( &(windows[1])) ; 
     59        hasWindows=false ; 
     60        return true ; 
     61      } 
     62      else return false ; 
     63    } 
     64    else return true ; 
     65  } 
     66*/ 
    2367 
    2468  bool CServerBuffer::isBufferFree(size_t count) 
     
    72116  } 
    73117 
     118  bool CServerBuffer::isBufferEmpty(void) 
     119  { 
     120    if (used==0) return true ; 
     121    else return false; 
     122  } 
    74123 
    75124  void* CServerBuffer::getBuffer(size_t count) 
     
    128177    } 
    129178 
     179    used+=count ; 
    130180    return ret ; 
    131181  } 
     
    167217      } 
    168218    } 
    169   } 
    170  
     219    used-=count ; 
     220  } 
     221 
     222  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 
     223  { 
     224    if (!hasWindows) return false ; 
     225 
     226     
     227    size_t header[3] ; 
     228    size_t& clientTimeline=header[0] ; 
     229    size_t& clientCount=header[1] ; 
     230    size_t& control=header[2] ; 
     231    bool ok=false ; 
     232     
     233    MPI_Group group ; 
     234    int groupSize,groupRank ; 
     235    MPI_Win_get_group(windows_[currentWindows], &group) ; 
     236    MPI_Group_size(group, &groupSize) ; 
     237    MPI_Group_rank(group, &groupRank) ; 
     238     
     239    lockBuffer();  
     240 
     241// lock is acquired 
     242 
     243    MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],0), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     244    MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     245    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
     246 
     247//    control=1 ; 
     248//    MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     249    
     250//    MPI_Win_unlock(windowsRank_, windows_[currentWindows]) ; 
     251    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
     252//    info(100)<<"getBufferFromClient : windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 
     253    if (timeLine==clientTimeline) 
     254    { 
     255//      info(50)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 
     256  
     257//      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
     258      buffer=(char*)getBuffer(clientCount) ; 
     259      count=clientCount ; 
     260      MPI_Get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR, windows_[currentWindows]) ; 
     261      clientTimeline = 0 ; 
     262      clientCount = 0 ; 
     263//      control=0 ; 
     264      MPI_Put(&header[0], 2, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],0) , 2, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     265 
     266// release lock 
     267     unlockBuffer() ; 
     268 
     269      ok=true ; 
     270      char checksum=0 ; 
     271      for(size_t i=0;i<count;i++) checksum=checksum+buffer[i] ; 
     272      char checksumFirst=0 ; 
     273      for(size_t i=5; i<10 && i<count ;i++) checksumFirst=checksumFirst+buffer[i] ; 
     274      char checksumLast=0 ; 
     275      for(size_t i=(count<10)?0:count-10; i<count ; i++) checksumLast=checksumLast+buffer[i] ; 
     276       
     277      info(40)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline " 
     278              <<clientTimeline<<" clientCount "<<count<<" checksum "<<(int)checksum<<" " 
     279              <<(int)buffer[0]<<" "<<(int)buffer[1]<<" "<<(int)buffer[2]<<" "<<(int)buffer[3]<<" "<<(int)buffer[4]<<" "<<(int)buffer[5]<<" "  
     280              <<(int)buffer[6]<<" "<<(int)buffer[7]<<" "<<(int)buffer[8]<<" "<<(int)buffer[9]<<" "<<(int)buffer[10]<<" "<<(int)buffer[11]<<endl ; 
     281 
     282    } 
     283    else 
     284    { 
     285      //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
     286      //control=0 ; 
     287      //MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     288  
     289 // release lock 
     290      unlockBuffer() ; 
     291    } 
     292 
     293    if (ok) return true ; 
     294 
     295    return false ; 
     296  } 
     297   
     298  void CServerBuffer::lockBuffer(void) 
     299  { 
     300    if (!hasWindows) return ; 
     301 
     302    long long int lock=1 ; 
     303    long long int zero=0, one=1 ; 
     304//    control=1 ; 
     305    MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
     306    while(lock!=0) 
     307    { 
     308      MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 
     309                           windows_[currentWindows]) ; 
     310      MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
     311    } 
     312  } 
     313 
     314  void CServerBuffer::unlockBuffer(void) 
     315  { 
     316    if (!hasWindows) return ; 
     317    long long int lock=1 ; 
     318    long long int zero=0, one=1 ; 
     319     
     320    MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 
     321                          windows_[currentWindows]) ; 
     322    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;  
     323    MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ; 
     324  } 
     325   
     326  void CServerBuffer::notifyClientFinalize(void) 
     327  { 
     328    if (!hasWindows) return ; 
     329    size_t finalize=1 ; 
     330    lockBuffer();  
     331// lock is acquired 
     332    MPI_Put(&finalize, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],3*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     333    unlockBuffer() ; 
     334  } 
    171335} 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.hpp

    r717 r1757  
    1212  { 
    1313    public: 
    14       CServerBuffer(StdSize bufSize) ; 
     14      CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize bufSize) ; 
    1515      ~CServerBuffer() ; 
    1616 
     
    1818      void* getBuffer(size_t count) ; 
    1919      void freeBuffer(size_t count) ; 
    20  
     20      void createWindows(MPI_Comm oneSidedComm) ; 
     21      bool freeWindows(void) ; 
     22      bool getBufferFromClient(size_t timeLine, char* & buffer, size_t& count) ; 
     23      bool isBufferEmpty(void) ; 
     24      void updateCurrentWindows(void) ; 
     25      void lockBuffer(void) ; 
     26      void unlockBuffer(void) ; 
     27      void notifyClientFinalize(void) ; 
    2128    private: 
    2229      char* buffer; 
     
    2532      size_t end; 
    2633      size_t size; 
     34      size_t used ;  // count of element occupied 
     35      std::vector<MPI_Win> windows_ ; 
     36      std::vector<MPI_Aint> winAddress_ ; 
     37 
     38      int currentWindows ; 
     39      bool hasWindows ; 
     40      int windowsRank_ ; 
    2741  }; 
    2842} 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_client.cpp

    r1639 r1757  
    2424     : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4) 
    2525    { 
     26      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 
     27      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 
     28       
    2629      context = parent; 
    2730      intraComm = intraComm_; 
     
    3740      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 
    3841 
    39       timeLine = 0; 
     42      if (flag) MPI_Intercomm_merge(interComm_,false,&interCommMerged) ; 
     43       
     44      if (!isAttachedModeEnabled()) 
     45      {   
     46        windows.resize(serverSize) ; 
     47        MPI_Comm winComm ; 
     48        for(int rank=0; rank<serverSize; rank++) 
     49        { 
     50          windows[rank].resize(2) ; 
     51          MPI_Comm_split(interCommMerged, rank, clientRank, &winComm); 
     52          int myRank ; 
     53          MPI_Comm_rank(winComm,&myRank); 
     54          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][0]); 
     55          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][1]); 
     56          MPI_Comm_free(&winComm) ; 
     57        } 
     58      } 
     59 
     60      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 
     61 
     62      timeLine = 1; 
    4063    } 
    4164 
     
    116139        list<int> sizes = event.getSizes(); 
    117140 
    118         // We force the getBuffers call to be non-blocking on classical servers 
     141         // We force the getBuffers call to be non-blocking on classical servers 
    119142        list<CBufferOut*> buffList; 
    120         bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) )); 
    121 //        bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer ); 
    122  
    123         if (couldBuffer) 
    124         { 
    125           event.send(timeLine, sizes, buffList); 
    126           info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<"  sent"<<endl ; 
    127  
    128           checkBuffers(ranks); 
    129  
    130           if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
    131           { 
    132             waitEvent(ranks); 
    133             CContext::setCurrent(context->getId()); 
    134           } 
    135         } 
    136         else 
    137         { 
    138           tmpBufferedEvent.ranks = ranks; 
    139           tmpBufferedEvent.sizes = sizes; 
    140  
    141           for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 
    142             tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 
    143           info(100)<<"DEBUG : temporaly event created : timeline "<<timeLine<<endl ; 
    144           event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 
    145           info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<"  sent"<<endl ; 
     143        getBuffers(timeLine, ranks, sizes, buffList) ; 
     144 
     145        event.send(timeLine, sizes, buffList); 
     146        
     147        //for (auto itRank = ranks.begin(); itRank != ranks.end(); itRank++) buffers[*itRank]->infoBuffer() ; 
     148 
     149        unlockBuffers(ranks) ; 
     150        info(100)<<"Event "<<timeLine<<" of context "<<context->getId()<<"  sent"<<endl ; 
     151           
     152        checkBuffers(ranks); 
     153 
     154        if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
     155        { 
     156          waitEvent(ranks); 
     157          CContext::setCurrent(context->getId()); 
    146158        } 
    147159      } 
    148160 
    149161      timeLine++; 
    150     } 
    151  
    152     /*! 
    153      * Send the temporarily buffered event (if any). 
    154      * 
    155      * \return true if a temporarily buffered event could be sent, false otherwise  
    156      */ 
    157     bool CContextClient::sendTemporarilyBufferedEvent() 
    158     { 
    159       bool couldSendTmpBufferedEvent = false; 
    160  
    161       if (hasTemporarilyBufferedEvent()) 
    162       { 
    163         list<CBufferOut*> buffList; 
    164         if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call 
    165         { 
    166           list<CBufferOut*>::iterator it, itBuffer; 
    167  
    168           for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++) 
    169             (*itBuffer)->put((char*)(*it)->start(), (*it)->count()); 
    170  
    171           info(100)<<"DEBUG : temporaly event sent "<<endl ; 
    172           checkBuffers(tmpBufferedEvent.ranks); 
    173  
    174           tmpBufferedEvent.clear(); 
    175  
    176           couldSendTmpBufferedEvent = true; 
    177         } 
    178       } 
    179  
    180       return couldSendTmpBufferedEvent; 
    181162    } 
    182163 
     
    205186     * it is explicitly requested to be non-blocking. 
    206187     * 
     188     * 
     189     * \param [in] timeLine time line of the event which will be sent to servers 
    207190     * \param [in] serverList list of rank of connected server 
    208191     * \param [in] sizeList size of message corresponding to each connection 
     
    211194     * \return whether the already allocated buffers could be used 
    212195    */ 
    213     bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 
     196    bool CContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 
    214197                                    bool nonBlocking /*= false*/) 
    215198    { 
     
    236219        areBuffersFree = true; 
    237220        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     221        { 
    238222          areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 
     223        } 
    239224 
    240225        if (!areBuffersFree) 
    241226        { 
     227          for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 
    242228          checkBuffers(); 
    243           if (CServer::serverLevel == 0) 
    244             context->server->listen(); 
    245  
     229          if (CServer::serverLevel == 0)  context->server->listen(); 
    246230          else if (CServer::serverLevel == 1) 
    247231          { 
    248232            context->server->listen(); 
    249             for (int i = 0; i < context->serverPrimServer.size(); ++i) 
    250               context->serverPrimServer[i]->listen(); 
     233            for (int i = 0; i < context->serverPrimServer.size(); ++i)  context->serverPrimServer[i]->listen(); 
    251234            CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 
    252235          } 
    253236 
    254           else if (CServer::serverLevel == 2) 
    255             context->server->listen(); 
     237          else if (CServer::serverLevel == 2) context->server->listen(); 
    256238 
    257239        } 
    258240      } while (!areBuffersFree && !nonBlocking); 
    259  
    260241      CTimer::get("Blocking time").suspend(); 
    261242 
     
    263244      { 
    264245        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
    265           retBuffers.push_back((*itBuffer)->getBuffer(*itSize)); 
    266       } 
    267  
     246          retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 
     247      } 
    268248      return areBuffersFree; 
    269249   } 
     
    281261        maxEventSizes[rank] = CXios::minBufferSize; 
    282262      } 
    283       CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank], maxBufferedEvents); 
     263       
     264      vector<MPI_Win> Wins(2,MPI_WIN_NULL) ; 
     265      if (!isAttachedModeEnabled()) Wins=windows[rank] ; 
     266   
     267      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, Wins, clientRank, rank, mapBufferSize_[rank], maxEventSizes[rank]); 
    284268      // Notify the server 
    285       CBufferOut* bufOut = buffer->getBuffer(sizeof(StdSize)); 
    286       bufOut->put(mapBufferSize_[rank]); // Stupid C++ 
    287       buffer->checkBuffer(); 
     269      CBufferOut* bufOut = buffer->getBuffer(0, 3*sizeof(MPI_Aint)); 
     270      MPI_Aint sendBuff[3] ; 
     271      sendBuff[0]=mapBufferSize_[rank]; // Stupid C++ 
     272      sendBuff[1]=buffers[rank]->getWinAddress(0);  
     273      sendBuff[2]=buffers[rank]->getWinAddress(1);  
     274      info(100)<<"CContextClient::newBuffer : rank "<<rank<<" winAdress[0] "<<buffers[rank]->getWinAddress(0)<<" winAdress[1] "<<buffers[rank]->getWinAddress(1)<<endl; 
     275      bufOut->put(sendBuff, 3); // Stupid C++ 
     276      buffer->checkBuffer(true); 
     277 
     278/* 
     279      if (!isAttachedModeEnabled()) // create windows only in server mode 
     280      { 
     281        MPI_Comm OneSidedInterComm, oneSidedComm ; 
     282        MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &OneSidedInterComm ); 
     283        MPI_Intercomm_merge(OneSidedInterComm,false,&oneSidedComm); 
     284        buffer->createWindows(oneSidedComm) ; 
     285      } 
     286 */       
    288287   } 
    289288 
     
    297296      bool pending = false; 
    298297      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 
    299         pending |= itBuff->second->checkBuffer(); 
     298        pending |= itBuff->second->checkBuffer(!pureOneSided); 
    300299      return pending; 
    301300   } 
     
    307306      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 
    308307      { 
    309           delete itBuff->second; 
     308         delete itBuff->second; 
    310309      } 
    311310      buffers.clear(); 
    312    } 
    313  
     311 
     312/* don't know when release windows 
     313 
     314      if (!isAttachedModeEnabled()) 
     315      {   
     316        for(int rank=0; rank<serverSize; rank++) 
     317        { 
     318          MPI_Win_free(&windows[rank][0]); 
     319          MPI_Win_free(&windows[rank][1]); 
     320        } 
     321      }  
     322   } 
     323*/ 
     324       
     325  /*! 
     326   Lock the buffers for one sided communications 
     327   \param [in] ranks list rank of server to which client connects to 
     328   */ 
     329   void CContextClient::lockBuffers(list<int>& ranks) 
     330   { 
     331      list<int>::iterator it; 
     332      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer(); 
     333   } 
     334 
     335  /*! 
     336   Unlock the buffers for one sided communications 
     337   \param [in] ranks list rank of server to which client connects to 
     338   */ 
     339   void CContextClient::unlockBuffers(list<int>& ranks) 
     340   { 
     341      list<int>::iterator it; 
     342      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer(); 
     343   } 
     344       
    314345   /*! 
    315346   Verify state of buffers corresponding to a connection 
     
    321352      list<int>::iterator it; 
    322353      bool pending = false; 
    323       for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(); 
     354      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided); 
    324355      return pending; 
    325356   } 
     
    335366     mapBufferSize_ = mapSize; 
    336367     maxEventSizes = maxEventSize; 
    337  
    338      // Compute the maximum number of events that can be safely buffered. 
    339      double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max(); 
    340      for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it) 
    341      { 
    342        double ratio = double(it->second) / maxEventSizes[it->first]; 
    343        if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio; 
    344      } 
    345      MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm); 
    346  
    347      if (minBufferSizeEventSizeRatio < 1.0) 
    348      { 
    349        ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)", 
    350              << "The buffer sizes and the maximum events sizes are incoherent."); 
    351      } 
    352      else if (minBufferSizeEventSizeRatio == std::numeric_limits<double>::max()) 
    353        minBufferSizeEventSizeRatio = 1.0; // In this case, maxBufferedEvents will never be used but we want to avoid any floating point exception 
    354  
    355      maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server 
    356                           + size_t(minBufferSizeEventSizeRatio)  // one local buffer can always be fully used 
    357                           + 1;                                   // the other local buffer might contain only one event 
    358368   } 
    359369 
     
    410420  { 
    411421    map<int,CClientBuffer*>::iterator itBuff; 
     422    std::list<int>::iterator ItServerLeader;  
     423     
    412424    bool stop = false; 
    413425 
    414     CTimer::get("Blocking time").resume(); 
    415     while (hasTemporarilyBufferedEvent()) 
    416     { 
    417       checkBuffers(); 
    418       sendTemporarilyBufferedEvent(); 
    419     } 
    420     CTimer::get("Blocking time").suspend(); 
    421  
     426    int* nbServerConnectionLocal  = new int[serverSize] ; 
     427    int* nbServerConnectionGlobal  = new int[serverSize] ; 
     428    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ; 
     429    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ; 
     430    for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ; 
     431     
     432    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm); 
     433     
    422434    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 
     435    CMessage msg; 
     436 
     437    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ; 
     438    sendEvent(event); 
     439 
     440    delete[] nbServerConnectionLocal ; 
     441    delete[] nbServerConnectionGlobal ; 
     442/*     
    423443    if (isServerLeader()) 
    424444    { 
     
    433453    } 
    434454    else sendEvent(event); 
     455*/ 
    435456 
    436457    CTimer::get("Blocking time").resume(); 
    437 //    while (!stop) 
    438     { 
    439       checkBuffers(); 
    440       if (hasTemporarilyBufferedEvent()) 
    441         sendTemporarilyBufferedEvent(); 
    442  
    443       stop = true; 
    444 //      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 
    445     } 
     458    checkBuffers(); 
    446459    CTimer::get("Blocking time").suspend(); 
    447460 
     
    472485    return pending; 
    473486  } 
    474  
     487   
     488  bool CContextClient::isNotifiedFinalized(void) 
     489  { 
     490    if (isAttachedModeEnabled()) return true ; 
     491 
     492    bool finalized = true; 
     493    map<int,CClientBuffer*>::iterator itBuff; 
     494    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 
     495      finalized &= itBuff->second->isNotifiedFinalized(); 
     496    return finalized; 
     497  } 
    475498 
    476499} 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_client.hpp

    r1639 r1757  
    3131      // Send event to server 
    3232      void sendEvent(CEventClient& event); 
    33       bool sendTemporarilyBufferedEvent(); 
    3433      void waitEvent(list<int>& ranks); 
    3534 
    3635      // Functions to set/get buffers 
    37       bool getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 
     36      bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 
    3837      void newBuffer(int rank); 
    3938      bool checkBuffers(list<int>& ranks); 
     
    4847 
    4948      bool isAttachedModeEnabled() const; 
    50       bool hasTemporarilyBufferedEvent() const { return !tmpBufferedEvent.isEmpty(); }; 
    5149 
    5250      static void computeLeader(int clientRank, int clientSize, int serverSize, 
     
    5654      // Close and finalize context client 
    5755//      void closeContext(void);  Never been implemented. 
     56      bool isNotifiedFinalized(void) ; 
    5857      void finalize(void); 
    5958 
     
    7170      int serverSize; //!< Size of server group 
    7271 
    73       MPI_Comm interComm; //!< Communicator of server group 
     72      MPI_Comm interComm; //!< Communicator of server group (interCommunicator) 
     73 
     74      MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 
    7475 
    7576      MPI_Comm intraComm; //!< Communicator of client group 
    7677 
     78      MPI_Comm commSelf; //!< Communicator of the client alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication 
     79 
    7780      map<int,CClientBuffer*> buffers; //!< Buffers for connection to servers 
    7881 
     82      bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 
     83 
    7984    private: 
     85      void lockBuffers(list<int>& ranks) ; 
     86      void unlockBuffers(list<int>& ranks) ; 
     87       
    8088      //! Mapping of server and buffer size for each connection to server 
    8189      std::map<int,StdSize> mapBufferSize_; 
     
    8492      //! Maximum number of events that can be buffered 
    8593      StdSize maxBufferedEvents; 
    86  
    87       struct { 
    88         std::list<int> ranks, sizes; 
    89         std::list<CBufferOut*> buffers; 
    90  
    91         bool isEmpty() const { return ranks.empty(); }; 
    92         void clear() { 
    93           ranks.clear(); 
    94           sizes.clear(); 
    95  
    96           for (std::list<CBufferOut*>::iterator it = buffers.begin(); it != buffers.end(); it++) 
    97             delete *it; 
    98  
    99           buffers.clear(); 
    100         }; 
    101       } tmpBufferedEvent; //! Event temporarily buffered (used only on the server) 
    10294 
    10395      //! Context for server (Only used in attached mode) 
     
    110102      std::list<int> ranksServerNotLeader; 
    111103 
     104      std::vector<std::vector<MPI_Win> >windows ; //! one sided mpi windows to expose client buffers to servers == windows[nbServers][2] 
     105 
     106 
    112107  }; 
    113108} 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.cpp

    r1639 r1757  
    3333    int flag; 
    3434    MPI_Comm_test_inter(interComm,&flag); 
     35 
     36    if (flag) attachedMode=false ; 
     37    else  attachedMode=true ; 
     38     
    3539    if (flag) MPI_Comm_remote_size(interComm,&commSize); 
    3640    else  MPI_Comm_size(interComm,&commSize); 
    3741 
    38     currentTimeLine=0; 
     42      
     43    currentTimeLine=1; 
    3944    scheduled=false; 
    4045    finished=false; 
     
    4449    else 
    4550      hashId=hashString(context->getId()); 
    46   } 
    47  
     51 
     52    if (!isAttachedModeEnabled()) 
     53    { 
     54      MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 
     55// create windows for one sided comm 
     56      int interCommMergedRank; 
     57      MPI_Comm winComm ; 
     58      MPI_Comm_rank(intraComm, &interCommMergedRank); 
     59      windows.resize(2) ; 
     60      for(int rank=commSize; rank<commSize+intraCommSize; rank++) 
     61      { 
     62        if (rank==commSize+interCommMergedRank)  
     63        { 
     64          MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 
     65          int myRank ; 
     66          MPI_Comm_rank(winComm,&myRank); 
     67          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]); 
     68          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);       
     69        } 
     70        else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 
     71        MPI_Comm_free(&winComm) ; 
     72      } 
     73    } 
     74    else  
     75    { 
     76      windows.resize(2) ; 
     77      windows[0]=MPI_WIN_NULL ; 
     78      windows[1]=MPI_WIN_NULL ; 
     79    } 
     80 
     81 
     82     
     83    MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ; 
     84    itLastTimeLine=lastTimeLine.begin() ; 
     85 
     86    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 
     87    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 
     88       
     89  } 
     90 
     91//! Attached mode is used ? 
     92//! \return true if attached mode is used, false otherwise 
     93  bool CContextServer::isAttachedModeEnabled() const 
     94  { 
     95    return attachedMode ; 
     96  } 
     97   
    4898  void CContextServer::setPendingEvent(void) 
    4999  { 
     
    65115    listen(); 
    66116    checkPendingRequest(); 
    67     if (enableEventsProcessing) 
    68       processEvents(); 
     117    if (enableEventsProcessing)  processEvents(); 
    69118    return finished; 
    70119  } 
     
    117166    if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
    118167    { 
    119        StdSize buffSize = 0; 
    120        MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status); 
     168       MPI_Aint recvBuff[3] ; 
     169       MPI_Recv(recvBuff, 3, MPI_AINT, rank, 20, interComm, &status); 
     170       StdSize buffSize = recvBuff[0]; 
     171       vector<MPI_Aint> winAdress(2) ; 
     172       winAdress[0]=recvBuff[1] ; winAdress[1]=recvBuff[2] ; 
    121173       mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    122        it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 
     174       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 
     175      /* 
     176       if (!isAttachedModeEnabled()) 
     177       { 
     178         MPI_Comm OneSidedInterComm, oneSidedComm ; 
     179         MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0, &OneSidedInterComm ); 
     180         MPI_Intercomm_merge(OneSidedInterComm,true,&oneSidedComm); 
     181         buffers[rank]->createWindows(oneSidedComm) ; 
     182       } 
     183       */ 
     184       lastTimeLine[rank]=0 ; 
     185       itLastTimeLine=lastTimeLine.begin() ; 
     186 
    123187       return true; 
    124188    } 
     
    157221      if (flag==true) 
    158222      { 
     223        buffers[rank]->updateCurrentWindows() ; 
    159224        recvRequest.push_back(rank); 
    160225        MPI_Get_count(&status,MPI_CHAR,&count); 
     
    170235  } 
    171236 
     237  void CContextServer::getBufferFromClient(size_t timeLine) 
     238  { 
     239    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 
     240    {   
     241      int rank ; 
     242      char *buffer ; 
     243      size_t count ;  
     244 
     245      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 
     246      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 
     247      { 
     248        rank=itLastTimeLine->first ; 
     249        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0) 
     250        { 
     251          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) 
     252          { 
     253            processRequest(rank, buffer, count); 
     254            break ; 
     255          } 
     256        } 
     257      } 
     258    } 
     259  } 
     260          
     261        
    172262  void CContextServer::processRequest(int rank, char* buff,int count) 
    173263  { 
     
    176266    char* startBuffer,endBuffer; 
    177267    int size, offset; 
    178     size_t timeLine; 
     268    size_t timeLine=0; 
    179269    map<size_t,CEventServer*>::iterator it; 
    180270 
     271     
    181272    CTimer::get("Process request").resume(); 
    182273    while(count>0) 
     
    185276      CBufferIn newBuffer(startBuffer,buffer.remain()); 
    186277      newBuffer>>size>>timeLine; 
    187  
    188278      it=events.find(timeLine); 
    189279      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first; 
     
    193283      count=buffer.remain(); 
    194284    } 
     285 
     286    if (timeLine>0) lastTimeLine[rank]=timeLine ; 
     287     
    195288    CTimer::get("Process request").suspend(); 
    196289  } 
     
    230323        } 
    231324      } 
    232     } 
     325      else getBufferFromClient(currentTimeLine) ; 
     326    } 
     327    else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line 
    233328  } 
    234329 
     
    237332    map<int,CServerBuffer*>::iterator it; 
    238333    for(it=buffers.begin();it!=buffers.end();++it) delete it->second; 
     334  } 
     335 
     336  void CContextServer::releaseBuffers() 
     337  { 
     338    map<int,CServerBuffer*>::iterator it; 
     339    bool out ; 
     340    do 
     341    { 
     342      out=true ; 
     343      for(it=buffers.begin();it!=buffers.end();++it) 
     344      { 
     345//        out = out && it->second->freeWindows() ; 
     346 
     347      } 
     348    } while (! out) ;  
     349  } 
     350 
     351  void CContextServer::notifyClientsFinalize(void) 
     352  { 
     353    for(auto it=buffers.begin();it!=buffers.end();++it) 
     354    { 
     355      it->second->notifyClientFinalize() ; 
     356    } 
    239357  } 
    240358 
     
    254372      finished=true; 
    255373      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl; 
     374//      releaseBuffers() ; 
     375      notifyClientsFinalize() ; 
    256376      context->finalize(); 
     377 
     378/* don't know where release windows 
     379      MPI_Win_free(&windows[0]) ; 
     380      MPI_Win_free(&windows[1]) ; 
     381*/      
    257382      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
    258383                           iteMap = mapBufferSize_.end(), itMap; 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.hpp

    r1639 r1757  
    1919    bool listenPendingRequest(MPI_Status& status) ; 
    2020    void checkPendingRequest(void) ; 
     21    void getBufferFromClient(size_t timeLine) ; 
    2122    void processRequest(int rank, char* buff,int count) ; 
    2223    void processEvents(void) ; 
     
    2526    void setPendingEvent(void) ; 
    2627    bool hasPendingEvent(void) ; 
    27  
     28    bool isAttachedModeEnabled() const; 
     29    void releaseBuffers(void) ; 
     30    void notifyClientsFinalize(void) ; 
     31     
    2832    MPI_Comm intraComm ; 
    2933    int intraCommSize ; 
     
    3337    int commSize ; 
    3438 
     39    MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 
     40 
     41    MPI_Comm commSelf; //!< Communicator of the server alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication 
     42 
    3543    map<int,CServerBuffer*> buffers ; 
     44    map<int,size_t> lastTimeLine ; //!< last event time line for a processed request 
     45    map<int,size_t>::iterator itLastTimeLine ; //!< iterator on lastTimeLine 
    3646    map<int,MPI_Request> pendingRequest ; 
    3747    map<int,char*> bufferRequest ; 
     
    4454    bool pendingEvent ; 
    4555    bool scheduled  ;    /*!< event of current timeline is alreading scheduled ? */ 
     56    bool attachedMode ;  //! true if attached mode is enabled otherwise false 
     57    bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 
     58          
    4659    size_t hashId ; 
    4760 
     
    5063    private: 
    5164      std::map<int, StdSize> mapBufferSize_; 
     65      vector<MPI_Win> windows ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side. 
     66 
    5267  } ; 
    5368 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/node/context.cpp

    r1639 r1757  
    429429    { 
    430430      client->checkBuffers(); 
    431       bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); 
    432       if (hasTmpBufferedEvent) 
    433         hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); 
    434       // Don't process events if there is a temporarily buffered event 
    435       return server->eventLoop(!hasTmpBufferedEvent || !enableEventsProcessing); 
     431      return server->eventLoop(true); 
    436432    } 
    437433    else if (CServer::serverLevel == 1) 
    438434    { 
    439       if (!finalized) 
    440         client->checkBuffers(); 
     435      if (!finalized) client->checkBuffers(); 
    441436      bool serverFinished = true; 
    442       if (!finalized) 
    443         serverFinished = server->eventLoop(enableEventsProcessing); 
     437      if (!finalized) serverFinished = server->eventLoop(enableEventsProcessing); 
    444438      bool serverPrimFinished = true; 
    445439      for (int i = 0; i < clientPrimServer.size(); ++i) 
    446440      { 
    447         if (!finalized) 
    448           clientPrimServer[i]->checkBuffers(); 
    449         if (!finalized) 
    450           serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing); 
     441        if (!finalized) clientPrimServer[i]->checkBuffers(); 
     442        if (!finalized) serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing); 
    451443      } 
    452444      return ( serverFinished && serverPrimFinished); 
     
    484476         ++countChildCtx_; 
    485477 
     478         info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 
    486479         client->finalize(); 
    487          while (client->havePendingRequests()) 
    488             client->checkBuffers(); 
    489  
     480         info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 
     481         while (client->havePendingRequests()) client->checkBuffers(); 
     482          
     483         info(100)<<"DEBUG: context "<<getId()<<" no pending request ok"<<endl ; 
    490484         while (!server->hasFinished()) 
    491485           server->eventLoop(); 
     486        info(100)<<"DEBUG: context "<<getId()<<" server has finished"<<endl ; 
     487         
     488        bool notifiedFinalized=false ; 
     489        do 
     490        { 
     491          notifiedFinalized=client->isNotifiedFinalized() ; 
     492        } while (!notifiedFinalized) ; 
     493        client->releaseBuffers(); 
    492494 
    493495         if (hasServer) // Mode attache 
     
    499501 
    500502         //! Deallocate client buffers 
    501          client->releaseBuffers(); 
    502  
     503//         client->releaseBuffers(); 
     504        info(100)<<"DEBUG: context "<<getId()<<" release client ok"<<endl ; 
    503505         //! Free internally allocated communicators 
    504506         for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 
     
    515517       if (countChildCtx_ == 0) 
    516518         for (int i = 0; i < clientPrimServer.size(); ++i) 
     519         { 
    517520           clientPrimServer[i]->finalize(); 
     521           bool bufferReleased; 
     522           do 
     523           { 
     524             clientPrimServer[i]->checkBuffers(); 
     525             bufferReleased = !clientPrimServer[i]->havePendingRequests(); 
     526           } while (!bufferReleased); 
     527            
     528           bool notifiedFinalized=false ; 
     529           do 
     530           { 
     531//             clientPrimServer[i]->checkBuffers(); 
     532             notifiedFinalized=clientPrimServer[i]->isNotifiedFinalized() ; 
     533           } while (!notifiedFinalized) ; 
     534           clientPrimServer[i]->releaseBuffers(); 
     535         } 
     536            
    518537 
    519538       // (Last) context finalized message received 
     
    521540       { 
    522541         // Blocking send of context finalize message to its client (e.g. primary server or model) 
    523          info(100)<<"DEBUG: context "<<getId()<<" Send client finalize<<"<<endl ; 
     542         info(100)<<"DEBUG: context "<<getId()<<" Send client finalize"<<endl ; 
    524543         client->finalize(); 
     544         info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent"<<endl ; 
    525545         bool bufferReleased; 
    526546         do 
     
    529549           bufferReleased = !client->havePendingRequests(); 
    530550         } while (!bufferReleased); 
     551          
     552         bool notifiedFinalized=false ; 
     553         do 
     554         { 
     555  //         client->checkBuffers(); 
     556           notifiedFinalized=client->isNotifiedFinalized() ; 
     557         } while (!notifiedFinalized) ; 
     558         client->releaseBuffers(); 
     559          
    531560         finalized = true; 
    532  
     561         info(100)<<"DEBUG: context "<<getId()<<" bufferRelease OK"<<endl ; 
     562          
    533563         closeAllFile(); // Just move to here to make sure that server-level 1 can close files 
    534564         if (hasServer && !hasClient) 
     
    539569 
    540570         //! Deallocate client buffers 
    541          client->releaseBuffers(); 
     571//         client->releaseBuffers(); 
     572         info(100)<<"DEBUG: context "<<getId()<<" client release"<<endl ; 
     573 
     574/*          
    542575         for (int i = 0; i < clientPrimServer.size(); ++i) 
    543576           clientPrimServer[i]->releaseBuffers(); 
    544  
     577*/ 
    545578         //! Free internally allocated communicators 
    546579         for (std::list<MPI_Comm>::iterator it = comms.begin(); it != comms.end(); ++it) 
Note: See TracChangeset for help on using the changeset viewer.