Changeset 1547


Ignore:
Timestamp:
06/20/18 09:09:23 (3 years ago)
Author:
ymipsl
Message:

New communication protocol between clients and servers, using hybrid mode of p2p mixt with one_sided communication in order to avoid dead-locking. The constraint of the maximum number of event that can be bufferized on client side is released.

Dev branch is created to be tested before merging.

YM

Location:
XIOS/dev/dev_ym/XIOS_ONE_SIDED/src
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.cpp

    r1227 r1547  
    1212  size_t CClientBuffer::maxRequestSize = 0; 
    1313 
    14   CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize, StdSize maxBufferedEvents) 
     14  CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 
    1515    : interComm(interComm) 
    1616    , serverRank(serverRank) 
     
    2020    , current(0) 
    2121    , count(0) 
    22     , bufferedEvents(0) 
    23     , maxBufferedEvents(maxBufferedEvents) 
    2422    , pending(false) 
    25   { 
    26     buffer[0] = new char[bufferSize]; // transform it with MPI_ALLOC_MEM later 
    27     buffer[1] = new char[bufferSize]; 
     23    , hasWindows(false)  
     24  { 
     25      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 
     26      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 
     27 
     28 
     29    buffer[0] = bufferHeader[0]+headerSize ; 
     30    buffer[1] = bufferHeader[1]+headerSize ; 
     31    firstTimeLine[0]=(size_t*)bufferHeader[0] ; 
     32    firstTimeLine[1]=(size_t*)bufferHeader[1] ; 
     33    bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 
     34    bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 
     35    control[0]=(size_t*)bufferHeader[0] +2 ; 
     36    control[1]=(size_t*)bufferHeader[1] +2 ; 
     37 
     38    *firstTimeLine[0]=0 ; 
     39    *firstTimeLine[1]=0 ; 
     40    *bufferCount[0]=0 ; 
     41    *bufferCount[1]=0 ; 
     42    *control[0]=0 ; 
     43    *control[1]=0 ; 
     44    winState[0]=false ; 
     45    winState[1]=false ; 
    2846    retBuffer = new CBufferOut(buffer[current], bufferSize); 
    29     info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << " with a maximum of " << maxBufferedEvents << " buffered events" << endl; 
     47    info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl; 
    3048  } 
    3149 
    3250  CClientBuffer::~CClientBuffer() 
    3351  { 
    34    delete [] buffer[0]; 
    35    delete [] buffer[1]; 
    36    delete retBuffer; 
     52     freeWindows() ; 
     53     MPI_Free_mem(bufferHeader[0]) ; 
     54     MPI_Free_mem(bufferHeader[1]) ; 
     55     delete retBuffer; 
     56  } 
     57 
     58  void CClientBuffer::createWindows(MPI_Comm oneSidedComm) 
     59  { 
     60    MPI_Barrier(oneSidedComm) ; 
     61    MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 
     62    MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 
     63 
     64    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 
     65    *firstTimeLine[0]=0 ; 
     66    *bufferCount[0]=0 ; 
     67    *control[0]=0 ; 
     68    MPI_Win_unlock(0, windows[0]) ; 
     69 
     70    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 
     71    *firstTimeLine[1]=0 ; 
     72    *bufferCount[1]=0 ; 
     73    *control[1]=0 ; 
     74    MPI_Win_unlock(0, windows[1]) ; 
     75    winState[0]=false ; 
     76    winState[1]=false ; 
     77    MPI_Barrier(oneSidedComm) ; 
     78    hasWindows=true ; 
     79  } 
     80 
     81  void CClientBuffer::freeWindows() 
     82  { 
     83    if (hasWindows) 
     84    { 
     85      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 
     86      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 
     87      *control[0]=2 ; 
     88      *control[1]=2 ; 
     89      MPI_Win_unlock(0, windows[1]) ; 
     90      MPI_Win_unlock(0, windows[0]) ; 
     91       
     92      MPI_Win_free(&windows[0]) ; 
     93      MPI_Win_free(&windows[1]) ; 
     94      hasWindows=false ; 
     95    } 
     96  } 
     97  
     98  void CClientBuffer::lockBuffer(void) 
     99  { 
     100    if (hasWindows) 
     101    { 
     102      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[current]) ; 
     103      winState[current]=true ; 
     104    } 
     105  } 
     106 
     107  void CClientBuffer::unlockBuffer(void) 
     108  { 
     109    if (hasWindows) 
     110    { 
     111      MPI_Win_unlock(0, windows[current]) ; 
     112      winState[current]=false ; 
     113    } 
    37114  } 
    38115 
     
    44121  bool CClientBuffer::isBufferFree(StdSize size) 
    45122  { 
     123    bool loop=true ; 
     124    while (loop)  
     125    { 
     126      lockBuffer(); 
     127      if (*control[current]==0) loop=false ; // attemp to read from server ? 
     128      else unlockBuffer() ; 
     129    } 
     130     
    46131    if (size > bufferSize) 
    47132      ERROR("bool CClientBuffer::isBufferFree(StdSize size)", 
     
    59144    } 
    60145 
    61  
    62     return (size <= remain() && bufferedEvents < maxBufferedEvents); 
    63   } 
    64  
    65  
    66   CBufferOut* CClientBuffer::getBuffer(StdSize size) 
     146      count=*bufferCount[current] ; 
     147      return (size <= remain()); 
     148  } 
     149 
     150 
     151  CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size) 
    67152  { 
    68153    if (size <= remain()) 
    69154    { 
     155      info(100)<<"count "<<count<<"   bufferCount[current]  "<<*bufferCount[current]<<endl ; 
    70156      retBuffer->realloc(buffer[current] + count, size); 
    71157      count += size; 
    72       bufferedEvents++; 
     158      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 
     159      *bufferCount[current]=count ; 
    73160      return retBuffer; 
    74161    } 
     
    81168  } 
    82169 
    83   bool CClientBuffer::checkBuffer(void) 
     170  bool CClientBuffer::checkBuffer(bool send) 
    84171  { 
    85172    MPI_Status status; 
     
    96183    if (!pending) 
    97184    { 
     185      if (!send) return false ; 
    98186      if (count > 0) 
    99187      { 
    100         MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
    101         pending = true; 
    102         if (current == 1) current = 0; 
    103         else current = 1; 
    104         count = 0; 
    105         bufferedEvents = 0; 
     188        lockBuffer() ; 
     189        if (*control[current]==0 && bufferCount[current] > 0) 
     190        { 
     191          MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
     192          pending = true; 
     193          *control[current]=0 ; 
     194          *firstTimeLine[current]=0 ; 
     195          *bufferCount[current]=0 ; 
     196 
     197           unlockBuffer() ; 
     198 
     199          if (current == 1) current = 0; 
     200          else current = 1; 
     201          count = 0; 
     202        } 
     203        else unlockBuffer() ; 
    106204      } 
    107205    } 
     
    112210  bool CClientBuffer::hasPendingRequest(void) 
    113211  { 
     212    
     213    lockBuffer() ; 
     214    count=*bufferCount[current] ; 
     215    unlockBuffer() ; 
     216 
    114217    return (pending || count > 0); 
    115218  } 
     219 
     220 
    116221} 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.hpp

    r1227 r1547  
    1414      static size_t maxRequestSize; 
    1515 
    16       CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize, StdSize maxBufferedEvents); 
     16      CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize); 
    1717      ~CClientBuffer(); 
    18  
     18      void createWindows(MPI_Comm oneSidedComm) ; 
     19      void freeWindows(void) ; 
     20      void lockBuffer(void) ; 
     21      void unlockBuffer(void) ; 
     22          
    1923      bool isBufferFree(StdSize size); 
    20       CBufferOut* getBuffer(StdSize size); 
    21       bool checkBuffer(void); 
     24      CBufferOut* getBuffer(size_t timeLine, StdSize size); 
     25      bool checkBuffer(bool send=false); 
    2226      bool hasPendingRequest(void); 
    2327      StdSize remain(void); 
     
    2529    private: 
    2630      char* buffer[2]; 
    27  
     31      char* bufferHeader[2]; 
     32      size_t* firstTimeLine[2] ; 
     33      size_t* bufferCount[2] ; 
     34      size_t* control[2] ; 
     35      bool winState[2] ; 
    2836      int current; 
    2937 
    3038      StdSize count; 
    31       StdSize bufferedEvents; 
    3239      StdSize maxEventSize; 
    33       const StdSize maxBufferedEvents; 
    3440      const StdSize bufferSize; 
    3541      const StdSize estimatedMaxEventSize; 
     
    4349      CBufferOut* retBuffer; 
    4450      const MPI_Comm interComm; 
     51      MPI_Win windows[2] ; 
     52      bool hasWindows ; 
     53      static const int headerSize=3*sizeof(size_t); 
    4554  }; 
    4655} 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.cpp

    r885 r1547  
    77{ 
    88 
    9   CServerBuffer::CServerBuffer(StdSize buffSize) 
     9  CServerBuffer::CServerBuffer(StdSize buffSize) : hasWindows(false) 
    1010  { 
    1111    size = 3 * buffSize; 
     
    1313    current = 1; 
    1414    end = size; 
     15    used=0 ; 
    1516    buffer = new char[size]; // use MPI_ALLOC_MEM later? 
     17    currentWindows=0 ; 
    1618  } 
    1719 
     
    2123  } 
    2224 
     25  void CServerBuffer::updateCurrentWindows(void) 
     26  { 
     27    if (currentWindows==0) currentWindows=1 ; 
     28    else currentWindows=0 ; 
     29  } 
     30   
     31  void CServerBuffer::createWindows(MPI_Comm oneSidedComm) 
     32  { 
     33    MPI_Barrier(oneSidedComm) ; 
     34    MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 
     35    MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 
     36    hasWindows=true ; 
     37    MPI_Barrier(oneSidedComm) ; 
     38  } 
     39 
     40  bool CServerBuffer::freeWindows() 
     41  { 
     42    if (hasWindows) 
     43    { 
     44      size_t header[3] ; 
     45      size_t& control=header[2] ; 
     46      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[0]) ; 
     47      MPI_Get(&control, 1, MPI_LONG_LONG_INT, 0 , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ; 
     48      MPI_Win_unlock(0,windows[0]) ; 
     49      if (control==2)  // ok for free windows 
     50      { 
     51        MPI_Win_free( &(windows[0])) ; 
     52        MPI_Win_free( &(windows[1])) ; 
     53        hasWindows=false ; 
     54        return true ; 
     55      } 
     56      else return false ; 
     57    } 
     58    else return true ; 
     59  } 
    2360 
    2461  bool CServerBuffer::isBufferFree(size_t count) 
     
    72109  } 
    73110 
     111  bool CServerBuffer::isBufferEmpty(void) 
     112  { 
     113    if (used==0) return true ; 
     114    else return false; 
     115  } 
    74116 
    75117  void* CServerBuffer::getBuffer(size_t count) 
     
    128170    } 
    129171 
     172    used+=count ; 
    130173    return ret ; 
    131174  } 
     
    167210      } 
    168211    } 
    169   } 
    170  
     212    used-=count ; 
     213  } 
     214 
     215  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 
     216  { 
     217    if (!hasWindows) return false ; 
     218 
     219     
     220    size_t header[3] ; 
     221    size_t& clientTimeline=header[0] ; 
     222    size_t& clientCount=header[1] ; 
     223    size_t& control=header[2] ; 
     224    bool ok=false ; 
     225     
     226    MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[currentWindows]) ; 
     227 
     228    MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, 0 , 0, 1, MPI_LONG_LONG_INT,windows[currentWindows]) ; 
     229    MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, 0 , 1*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[currentWindows]) ; 
     230    control=1 ; 
     231    MPI_Put(&control, 1, MPI_LONG_LONG_INT, 0 , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[currentWindows]) ; 
     232    
     233    MPI_Win_unlock(0,windows[currentWindows]) ; 
     234 
     235    if (timeLine==clientTimeline) 
     236    { 
     237 
     238      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[currentWindows]) ; 
     239      buffer=(char*)getBuffer(clientCount) ; 
     240      count=clientCount ; 
     241      MPI_Get(buffer, clientCount, MPI_CHAR, 0, 3*sizeof(size_t) , clientCount, MPI_CHAR, windows[currentWindows]) ; 
     242      clientTimeline = 0 ; 
     243      clientCount = 0 ; 
     244      control=0 ; 
     245      MPI_Put(&header[0], 3, MPI_LONG_LONG_INT, 0, 0 , 3, MPI_LONG_LONG_INT,windows[currentWindows]) ; 
     246   
     247      MPI_Win_unlock(0,windows[currentWindows]) ; 
     248      ok=true ; 
     249    } 
     250    else 
     251    { 
     252      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[currentWindows]) ; 
     253      control=0 ; 
     254      MPI_Put(&control, 1, MPI_LONG_LONG_INT, 0 , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[currentWindows]) ; 
     255      MPI_Win_unlock(0,windows[currentWindows]) ; 
     256    } 
     257 
     258    if (ok) return true ; 
     259 
     260    return false ; 
     261  } 
     262     
     263    
    171264} 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.hpp

    r717 r1547  
    1818      void* getBuffer(size_t count) ; 
    1919      void freeBuffer(size_t count) ; 
    20  
     20      void createWindows(MPI_Comm oneSidedComm) ; 
     21      bool freeWindows(void) ; 
     22      bool getBufferFromClient(size_t timeLine, char* & buffer, size_t& count) ; 
     23      bool isBufferEmpty(void) ; 
     24      void updateCurrentWindows(void) ; 
    2125    private: 
    2226      char* buffer; 
     
    2529      size_t end; 
    2630      size_t size; 
     31      size_t used ;  // count of element occupied 
     32      MPI_Win windows[2] ; 
     33      int currentWindows ; 
     34      bool hasWindows ; 
    2735  }; 
    2836} 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_client.cpp

    r1475 r1547  
    2424     : mapBufferSize_(), parentServer(cxtSer), maxBufferedEvents(4) 
    2525    { 
     26      pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 
     27      if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 
     28       
    2629      context = parent; 
    2730      intraComm = intraComm_; 
     
    3740      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 
    3841 
    39       timeLine = 0; 
     42      if (flag) MPI_Intercomm_merge(interComm_,false,&interCommMerged) ; 
     43 
     44      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 
     45 
     46      timeLine = 1; 
    4047    } 
    4148 
     
    116123        list<int> sizes = event.getSizes(); 
    117124 
    118         // We force the getBuffers call to be non-blocking on classical servers 
     125         // We force the getBuffers call to be non-blocking on classical servers 
    119126        list<CBufferOut*> buffList; 
    120         bool couldBuffer = getBuffers(ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) )); 
    121 //        bool couldBuffer = getBuffers(ranks, sizes, buffList, CXios::isServer ); 
    122  
    123         if (couldBuffer) 
    124         { 
    125           event.send(timeLine, sizes, buffList); 
    126  
    127           checkBuffers(ranks); 
    128  
    129           if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
    130           { 
    131             waitEvent(ranks); 
    132             CContext::setCurrent(context->getId()); 
    133           } 
    134         } 
    135         else 
    136         { 
    137           tmpBufferedEvent.ranks = ranks; 
    138           tmpBufferedEvent.sizes = sizes; 
    139  
    140           for (list<int>::const_iterator it = sizes.begin(); it != sizes.end(); it++) 
    141             tmpBufferedEvent.buffers.push_back(new CBufferOut(*it)); 
    142           info(100)<<"DEBUG : temporaly event created : timeline "<<timeLine<<endl ; 
    143           event.send(timeLine, tmpBufferedEvent.sizes, tmpBufferedEvent.buffers); 
     127//        bool couldBuffer = getBuffers(timeLine, ranks, sizes, buffList, (!CXios::isClient && (CServer::serverLevel == 0) )); 
     128        getBuffers(timeLine, ranks, sizes, buffList) ; 
     129 
     130        event.send(timeLine, sizes, buffList); 
     131        unlockBuffers(ranks) ; 
     132           
     133        checkBuffers(ranks); 
     134 
     135        if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
     136        { 
     137          waitEvent(ranks); 
     138          CContext::setCurrent(context->getId()); 
    144139        } 
    145140      } 
    146141 
    147142      timeLine++; 
    148     } 
    149  
    150     /*! 
    151      * Send the temporarily buffered event (if any). 
    152      * 
    153      * \return true if a temporarily buffered event could be sent, false otherwise  
    154      */ 
    155     bool CContextClient::sendTemporarilyBufferedEvent() 
    156     { 
    157       bool couldSendTmpBufferedEvent = false; 
    158  
    159       if (hasTemporarilyBufferedEvent()) 
    160       { 
    161         list<CBufferOut*> buffList; 
    162         if (getBuffers(tmpBufferedEvent.ranks, tmpBufferedEvent.sizes, buffList, true)) // Non-blocking call 
    163         { 
    164           list<CBufferOut*>::iterator it, itBuffer; 
    165  
    166           for (it = tmpBufferedEvent.buffers.begin(), itBuffer = buffList.begin(); it != tmpBufferedEvent.buffers.end(); it++, itBuffer++) 
    167             (*itBuffer)->put((char*)(*it)->start(), (*it)->count()); 
    168  
    169           info(100)<<"DEBUG : temporaly event sent "<<endl ; 
    170           checkBuffers(tmpBufferedEvent.ranks); 
    171  
    172           tmpBufferedEvent.clear(); 
    173  
    174           couldSendTmpBufferedEvent = true; 
    175         } 
    176       } 
    177  
    178       return couldSendTmpBufferedEvent; 
    179143    } 
    180144 
     
    203167     * it is explicitly requested to be non-blocking. 
    204168     * 
     169     * 
     170     * \param [in] timeLine time line of the event which will be sent to servers 
    205171     * \param [in] serverList list of rank of connected server 
    206172     * \param [in] sizeList size of message corresponding to each connection 
     
    209175     * \return whether the already allocated buffers could be used 
    210176    */ 
    211     bool CContextClient::getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 
     177    bool CContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 
    212178                                    bool nonBlocking /*= false*/) 
    213179    { 
     
    229195      } 
    230196 
     197      if (CXios::isServer) info(100)<<" getBuffers : entering loop"<<endl ; 
    231198      CTimer::get("Blocking time").resume(); 
    232199      do 
     
    234201        areBuffersFree = true; 
    235202        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     203        { 
    236204          areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 
    237  
     205        } 
     206 
     207        if (CXios::isServer) info(100)<<" getBuffers : areBuffersFree ? "<<areBuffersFree<<endl ; ; 
    238208        if (!areBuffersFree) 
    239209        { 
     210          for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 
     211          if (CXios::isServer) info(100)<<" getBuffers : buffers unlocked "<<endl ; 
    240212          checkBuffers(); 
    241           if (CServer::serverLevel == 0) 
    242             context->server->listen(); 
    243  
     213          if (CXios::isServer) info(100)<<" getBuffers : buffers checked "<<endl ; 
     214          if (CServer::serverLevel == 0)  context->server->listen(); 
     215          if (CXios::isServer) info(100)<<" getBuffers : server listened... "<<endl ; 
    244216          else if (CServer::serverLevel == 1) 
    245217          { 
    246218            context->server->listen(); 
    247             for (int i = 0; i < context->serverPrimServer.size(); ++i) 
    248               context->serverPrimServer[i]->listen(); 
     219            for (int i = 0; i < context->serverPrimServer.size(); ++i)  context->serverPrimServer[i]->listen(); 
    249220            CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 
    250221          } 
    251222 
    252           else if (CServer::serverLevel == 2) 
    253             context->server->listen(); 
     223          else if (CServer::serverLevel == 2) context->server->listen(); 
    254224 
    255225        } 
    256226      } while (!areBuffersFree && !nonBlocking); 
    257  
     227      if (CXios::isServer) info(100)<<" getBuffers : out of loop"<<endl ; 
    258228      CTimer::get("Blocking time").suspend(); 
    259229 
     
    261231      { 
    262232        for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
    263           retBuffers.push_back((*itBuffer)->getBuffer(*itSize)); 
    264       } 
    265  
     233          retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 
     234      } 
     235      if (CXios::isServer) info(100)<<" getBuffers : message pushed"<<endl ; 
    266236      return areBuffersFree; 
    267237   } 
     
    279249        maxEventSizes[rank] = CXios::minBufferSize; 
    280250      } 
    281       CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank], maxBufferedEvents); 
     251      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank]); 
    282252      // Notify the server 
    283       CBufferOut* bufOut = buffer->getBuffer(sizeof(StdSize)); 
     253      CBufferOut* bufOut = buffer->getBuffer(0, sizeof(StdSize)); 
    284254      bufOut->put(mapBufferSize_[rank]); // Stupid C++ 
    285       buffer->checkBuffer(); 
     255      buffer->checkBuffer(true); 
     256 
     257      if (!isAttachedModeEnabled()) // create windows only in server mode 
     258      { 
     259        MPI_Comm OneSidedInterComm, oneSidedComm ; 
     260        MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &OneSidedInterComm ); 
     261        MPI_Intercomm_merge(OneSidedInterComm,false,&oneSidedComm); 
     262        info(100)<<"DEBUG: before creating windows (client)"<<endl ; 
     263        buffer->createWindows(oneSidedComm) ; 
     264        info(100)<<"DEBUG: after creating windows (client)"<<endl ; 
     265      } 
     266        
    286267   } 
    287268 
     
    295276      bool pending = false; 
    296277      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 
    297         pending |= itBuff->second->checkBuffer(); 
     278        pending |= itBuff->second->checkBuffer(!pureOneSided); 
    298279      return pending; 
    299280   } 
     
    305286      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) 
    306287      { 
     288//          CEventClient event(CContext::GetType(), CContext::EVENT_ID_CLOSE_P2P_CHANNEL); 
     289//          CMessage msg; 
     290//          event.push(itBuff->first, 1, msg); 
     291//          timeLine = std::numeric_limits<size_t>::max() ; 
     292//          sendEvent(event); 
     293//          while (itBuff->second->checkBuffer(!pureOneSided)); 
    307294          delete itBuff->second; 
    308295      } 
     
    310297   } 
    311298 
     299  /*! 
     300   Lock the buffers for one sided communications 
     301   \param [in] ranks list rank of server to which client connects to 
     302   */ 
     303   void CContextClient::lockBuffers(list<int>& ranks) 
     304   { 
     305      list<int>::iterator it; 
     306      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->lockBuffer(); 
     307   } 
     308 
     309  /*! 
     310   Unlock the buffers for one sided communications 
     311   \param [in] ranks list rank of server to which client connects to 
     312   */ 
     313   void CContextClient::unlockBuffers(list<int>& ranks) 
     314   { 
     315      list<int>::iterator it; 
     316      for (it = ranks.begin(); it != ranks.end(); it++) buffers[*it]->unlockBuffer(); 
     317   } 
     318       
    312319   /*! 
    313320   Verify state of buffers corresponding to a connection 
     
    319326      list<int>::iterator it; 
    320327      bool pending = false; 
    321       for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(); 
     328      for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(!pureOneSided); 
    322329      return pending; 
    323330   } 
     
    333340     mapBufferSize_ = mapSize; 
    334341     maxEventSizes = maxEventSize; 
    335  
    336      // Compute the maximum number of events that can be safely buffered. 
    337      double minBufferSizeEventSizeRatio = std::numeric_limits<double>::max(); 
    338      for (std::map<int,StdSize>::const_iterator it = mapSize.begin(), ite = mapSize.end(); it != ite; ++it) 
    339      { 
    340        double ratio = double(it->second) / maxEventSizes[it->first]; 
    341        if (ratio < minBufferSizeEventSizeRatio) minBufferSizeEventSizeRatio = ratio; 
    342      } 
    343      MPI_Allreduce(MPI_IN_PLACE, &minBufferSizeEventSizeRatio, 1, MPI_DOUBLE, MPI_MIN, intraComm); 
    344  
    345      if (minBufferSizeEventSizeRatio < 1.0) 
    346      { 
    347        ERROR("void CContextClient::setBufferSize(const std::map<int,StdSize>& mapSize, const std::map<int,StdSize>& maxEventSize)", 
    348              << "The buffer sizes and the maximum events sizes are incoherent."); 
    349      } 
    350      else if (minBufferSizeEventSizeRatio == std::numeric_limits<double>::max()) 
    351        minBufferSizeEventSizeRatio = 1.0; // In this case, maxBufferedEvents will never be used but we want to avoid any floating point exception 
    352  
    353      maxBufferedEvents = size_t(2 * minBufferSizeEventSizeRatio) // there is room for two local buffers on the server 
    354                           + size_t(minBufferSizeEventSizeRatio)  // one local buffer can always be fully used 
    355                           + 1;                                   // the other local buffer might contain only one event 
    356342   } 
    357343 
     
    408394  { 
    409395    map<int,CClientBuffer*>::iterator itBuff; 
     396    std::list<int>::iterator ItServerLeader;  
     397     
    410398    bool stop = false; 
    411399 
    412     CTimer::get("Blocking time").resume(); 
    413     while (hasTemporarilyBufferedEvent()) 
    414     { 
    415       checkBuffers(); 
    416       sendTemporarilyBufferedEvent(); 
    417     } 
    418     CTimer::get("Blocking time").suspend(); 
    419  
     400    int* nbServerConnectionLocal  = new int[serverSize] ; 
     401    int* nbServerConnectionGlobal  = new int[serverSize] ; 
     402    for(int i=0;i<serverSize;++i) nbServerConnectionLocal[i]=0 ; 
     403    for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++)  nbServerConnectionLocal[itBuff->first]=1 ; 
     404    for (ItServerLeader = ranksServerLeader.begin(); ItServerLeader != ranksServerLeader.end(); ItServerLeader++)  nbServerConnectionLocal[*ItServerLeader]=1 ; 
     405     
     406    MPI_Allreduce(nbServerConnectionLocal, nbServerConnectionGlobal, serverSize, MPI_INT, MPI_SUM, intraComm); 
     407     
    420408    CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); 
     409    CMessage msg; 
     410 
     411    for (int i=0;i<serverSize;++i) if (nbServerConnectionLocal[i]==1) event.push(i, nbServerConnectionGlobal[i], msg) ; 
     412    sendEvent(event); 
     413 
     414    delete[] nbServerConnectionLocal ; 
     415    delete[] nbServerConnectionGlobal ; 
     416/*     
    421417    if (isServerLeader()) 
    422418    { 
     
    431427    } 
    432428    else sendEvent(event); 
     429*/ 
    433430 
    434431    CTimer::get("Blocking time").resume(); 
    435 //    while (!stop) 
    436     { 
    437       checkBuffers(); 
    438       if (hasTemporarilyBufferedEvent()) 
    439         sendTemporarilyBufferedEvent(); 
    440  
    441       stop = true; 
    442 //      for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop &= !itBuff->second->hasPendingRequest(); 
    443     } 
     432    checkBuffers(); 
    444433    CTimer::get("Blocking time").suspend(); 
    445434 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_client.hpp

    r1232 r1547  
    3131      // Send event to server 
    3232      void sendEvent(CEventClient& event); 
    33       bool sendTemporarilyBufferedEvent(); 
    3433      void waitEvent(list<int>& ranks); 
    3534 
    3635      // Functions to set/get buffers 
    37       bool getBuffers(const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 
     36      bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 
    3837      void newBuffer(int rank); 
    3938      bool checkBuffers(list<int>& ranks); 
     
    4847 
    4948      bool isAttachedModeEnabled() const; 
    50       bool hasTemporarilyBufferedEvent() const { return !tmpBufferedEvent.isEmpty(); }; 
    5149 
    5250      static void computeLeader(int clientRank, int clientSize, int serverSize, 
     
    7169      int serverSize; //!< Size of server group 
    7270 
    73       MPI_Comm interComm; //!< Communicator of server group 
     71      MPI_Comm interComm; //!< Communicator of server group (interCommunicator) 
     72 
     73      MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 
    7474 
    7575      MPI_Comm intraComm; //!< Communicator of client group 
    7676 
     77      MPI_Comm commSelf; //!< Communicator of the client alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication 
     78 
    7779      map<int,CClientBuffer*> buffers; //!< Buffers for connection to servers 
    7880 
     81      bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 
     82 
    7983    private: 
     84      void lockBuffers(list<int>& ranks) ; 
     85      void unlockBuffers(list<int>& ranks) ; 
     86       
    8087      //! Mapping of server and buffer size for each connection to server 
    8188      std::map<int,StdSize> mapBufferSize_; 
     
    8491      //! Maximum number of events that can be buffered 
    8592      StdSize maxBufferedEvents; 
    86  
    87       struct { 
    88         std::list<int> ranks, sizes; 
    89         std::list<CBufferOut*> buffers; 
    90  
    91         bool isEmpty() const { return ranks.empty(); }; 
    92         void clear() { 
    93           ranks.clear(); 
    94           sizes.clear(); 
    95  
    96           for (std::list<CBufferOut*>::iterator it = buffers.begin(); it != buffers.end(); it++) 
    97             delete *it; 
    98  
    99           buffers.clear(); 
    100         }; 
    101       } tmpBufferedEvent; //! Event temporarily buffered (used only on the server) 
    10293 
    10394      //! Context for server (Only used in attached mode) 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.cpp

    r1230 r1547  
    3333    int flag; 
    3434    MPI_Comm_test_inter(interComm,&flag); 
     35 
     36    if (flag) attachedMode=false ; 
     37    else  attachedMode=true ; 
     38     
    3539    if (flag) MPI_Comm_remote_size(interComm,&commSize); 
    3640    else  MPI_Comm_size(interComm,&commSize); 
    3741 
    38     currentTimeLine=0; 
     42      
     43    currentTimeLine=1; 
    3944    scheduled=false; 
    4045    finished=false; 
     
    4449    else 
    4550      hashId=hashString(context->getId()); 
    46   } 
    47  
     51 
     52    if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 
     53     
     54    MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ; 
     55    itLastTimeLine=lastTimeLine.begin() ; 
     56 
     57    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 
     58    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 
     59       
     60  } 
     61 
     62//! Attached mode is used ? 
     63//! \return true if attached mode is used, false otherwise 
     64  bool CContextServer::isAttachedModeEnabled() const 
     65  { 
     66    return attachedMode ; 
     67  } 
     68   
    4869  void CContextServer::setPendingEvent(void) 
    4970  { 
     
    6384  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/) 
    6485  { 
     86//    info(100)<<"CContextServer::eventLoop : listen"<<endl ; 
    6587    listen(); 
     88//    info(100)<<"CContextServer::eventLoop : checkPendingRequest"<<endl ; 
    6689    checkPendingRequest(); 
    67     if (enableEventsProcessing) 
    68       processEvents(); 
     90//    info(100)<<"CContextServer::eventLoop : process events"<<endl ; 
     91    if (enableEventsProcessing)  processEvents(); 
     92//    info(100)<<"CContextServer::eventLoop : finished "<<finished<<endl ; 
    6993    return finished; 
    7094  } 
     
    121145       mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    122146       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first; 
     147       if (!isAttachedModeEnabled()) 
     148       { 
     149         MPI_Comm OneSidedInterComm, oneSidedComm ; 
     150         MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0, &OneSidedInterComm ); 
     151         MPI_Intercomm_merge(OneSidedInterComm,true,&oneSidedComm); 
     152         info(100)<<"DEBUG: before creating windows (server)"<<endl ; 
     153         buffers[rank]->createWindows(oneSidedComm) ; 
     154         info(100)<<"DEBUG: before creating windows (server)"<<endl ; 
     155       } 
     156       lastTimeLine[rank]=0 ; 
     157       itLastTimeLine=lastTimeLine.begin() ; 
     158 
    123159       return true; 
    124160    } 
     
    157193      if (flag==true) 
    158194      { 
     195        buffers[rank]->updateCurrentWindows() ; 
    159196        recvRequest.push_back(rank); 
    160197        MPI_Get_count(&status,MPI_CHAR,&count); 
     
    170207  } 
    171208 
     209  void CContextServer::getBufferFromClient(size_t timeLine) 
     210  { 
     211    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 
     212    {   
     213      int rank ; 
     214      char *buffer ; 
     215      size_t count ;  
     216 
     217      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 
     218      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 
     219      { 
     220        rank=itLastTimeLine->first ; 
     221        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0) 
     222        { 
     223          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) 
     224          { 
     225            info(100)<<"get buffer from client : timeLine "<<timeLine<<endl ; 
     226            processRequest(rank, buffer, count); 
     227            break ; 
     228          } 
     229        } 
     230      } 
     231    } 
     232  } 
     233          
     234        
    172235  void CContextServer::processRequest(int rank, char* buff,int count) 
    173236  { 
     
    176239    char* startBuffer,endBuffer; 
    177240    int size, offset; 
    178     size_t timeLine; 
     241    size_t timeLine=0; 
    179242    map<size_t,CEventServer*>::iterator it; 
    180243 
     244     
    181245    CTimer::get("Process request").resume(); 
    182246    while(count>0) 
     
    185249      CBufferIn newBuffer(startBuffer,buffer.remain()); 
    186250      newBuffer>>size>>timeLine; 
    187  
     251      info(100)<<"new event :   timeLine : "<<timeLine<<"  size : "<<size<<endl ; 
    188252      it=events.find(timeLine); 
    189253      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first; 
     
    193257      count=buffer.remain(); 
    194258    } 
     259 
     260    if (timeLine>0) lastTimeLine[rank]=timeLine ; 
     261     
    195262    CTimer::get("Process request").suspend(); 
    196263  } 
     
    230297        } 
    231298      } 
    232     } 
     299      else getBufferFromClient(currentTimeLine) ; 
     300    } 
     301    else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line 
    233302  } 
    234303 
     
    237306    map<int,CServerBuffer*>::iterator it; 
    238307    for(it=buffers.begin();it!=buffers.end();++it) delete it->second; 
     308  } 
     309 
     310  void CContextServer::releaseBuffers() 
     311  { 
     312    map<int,CServerBuffer*>::iterator it; 
     313    bool out ; 
     314    do 
     315    { 
     316      out=true ; 
     317      for(it=buffers.begin();it!=buffers.end();++it) 
     318      { 
     319        out = out && it->second->freeWindows() ; 
     320 
     321      } 
     322    } while (! out) ;  
    239323  } 
    240324 
     
    254338      finished=true; 
    255339      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl; 
     340      releaseBuffers() ; 
    256341      context->finalize(); 
    257342      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
  • XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/context_server.hpp

    r1228 r1547  
    1919    bool listenPendingRequest(MPI_Status& status) ; 
    2020    void checkPendingRequest(void) ; 
     21    void getBufferFromClient(size_t timeLine) ; 
    2122    void processRequest(int rank, char* buff,int count) ; 
    2223    void processEvents(void) ; 
     
    2526    void setPendingEvent(void) ; 
    2627    bool hasPendingEvent(void) ; 
    27  
     28    bool isAttachedModeEnabled() const; 
     29    void releaseBuffers(void) ; 
     30     
    2831    MPI_Comm intraComm ; 
    2932    int intraCommSize ; 
     
    3336    int commSize ; 
    3437 
     38    MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 
     39 
     40    MPI_Comm commSelf; //!< Communicator of the server alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication 
     41 
    3542    map<int,CServerBuffer*> buffers ; 
     43    map<int,size_t> lastTimeLine ; //!< last event time line for a processed request 
     44    map<int,size_t>::iterator itLastTimeLine ; //!< iterator on lastTimeLine 
    3645    map<int,MPI_Request> pendingRequest ; 
    3746    map<int,char*> bufferRequest ; 
     
    4453    bool pendingEvent ; 
    4554    bool scheduled  ;    /*!< event of current timeline is alreading scheduled ? */ 
     55    bool attachedMode ;  //! true if attached mode is enabled otherwise false 
     56    bool pureOneSided ; //!< if true, client will communicated with servers only trough one sided communication. Otherwise the hybrid mode P2P /One sided is used. 
     57          
    4658    size_t hashId ; 
    4759 
Note: See TracChangeset for help on using the changeset viewer.