Changeset 2246


Ignore:
Timestamp:
10/11/21 14:41:56 (8 weeks ago)
Author:
ymipsl
Message:
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

Location:
XIOS/dev/dev_ym/XIOS_COUPLING/src
Files:
1 added
25 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp

    r2221 r2246  
    88#include "tracer.hpp" 
    99#include "timeline_events.hpp" 
     10#include "timer.hpp" 
    1011 
    1112namespace xios 
     
    2930    else hasWindows=true ; 
    3031 
    31       MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 
    32       MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 
    33       buffer[0] = bufferHeader[0]+headerSize ; 
    34       buffer[1] = bufferHeader[1]+headerSize ; 
    35       firstTimeLine[0]=(size_t*)bufferHeader[0] ; 
    36       firstTimeLine[1]=(size_t*)bufferHeader[1] ; 
    37       bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 
    38       bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 
    39       control[0]=(size_t*)bufferHeader[0] +2 ; 
    40       control[1]=(size_t*)bufferHeader[1] +2 ; 
    41       finalize[0]=(size_t*)bufferHeader[0] +3 ; 
    42       finalize[1]=(size_t*)bufferHeader[1] +3 ; 
     32      MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 
     33      MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 
     34      buffer[0] = bufferHeader[0]+headerSize_ ; 
     35      buffer[1] = bufferHeader[1]+headerSize_ ; 
     36      firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_ ; 
     37      firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_ ; 
     38      bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ; 
     39      bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ; 
     40      control[0]=(size_t*)bufferHeader[0] + controlOffset_ ; 
     41      control[1]=(size_t*)bufferHeader[1] + controlOffset_ ; 
     42      notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ; 
     43      notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ; 
    4344 
    4445      *firstTimeLine[0]=0 ; 
     
    4849      *control[0]=0 ; 
    4950      *control[1]=0 ; 
    50       *finalize[0]=0 ; 
    51       *finalize[1]=0 ; 
     51      *notify[0]=notifyNothing_ ; 
     52      *notify[1]=notifyNothing_ ; 
    5253      winState[0]=false ; 
    5354      winState[1]=false ; 
     
    5758    {   
    5859     
    59       MPI_Aint buffSize=bufferSize+headerSize ; 
     60      MPI_Aint buffSize=bufferSize+headerSize_ ; 
    6061      MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 
    6162      MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 
     
    106107  } 
    107108 
    108 /*  void CClientBuffer::createWindows(MPI_Comm oneSidedComm) 
    109   { 
    110     MPI_Barrier(oneSidedComm) ; 
    111     MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 
    112     MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 
    113  
    114     MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ; 
    115     *firstTimeLine[0]=0 ; 
    116     *bufferCount[0]=0 ; 
    117     *control[0]=0 ; 
    118     MPI_Win_unlock(0, windows[0]) ; 
    119  
    120     MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ; 
    121     *firstTimeLine[1]=0 ; 
    122     *bufferCount[1]=0 ; 
    123     *control[1]=0 ; 
    124     MPI_Win_unlock(0, windows[1]) ; 
    125     winState[0]=false ; 
    126     winState[1]=false ; 
    127     MPI_Barrier(oneSidedComm) ; 
    128     hasWindows=true ; 
    129   } 
    130 */ 
    131  
    132 /*   
    133   void CClientBuffer::freeWindows() 
    134   { 
    135     if (hasWindows) 
    136     { 
    137       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[0]) ; 
    138       MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[1]) ; 
    139       *control[0]=2 ; 
    140       *control[1]=2 ; 
    141       MPI_Win_unlock(0, windows_[1]) ; 
    142       MPI_Win_unlock(0, windows_[0]) ; 
    143        
    144       MPI_Win_free(&windows_[0]) ; 
    145       MPI_Win_free(&windows_[1]) ; 
    146       hasWindows=false ; 
    147     } 
    148   } 
    149 */  
    150109  void CClientBuffer::lockBuffer(void) 
    151110  { 
    152     if (hasWindows) 
    153     { 
    154    //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[current]) ; 
    155       long long int lock=1 ; 
    156       long long int zero=0, one=1 ; 
    157       
     111    CTimer::get("lock buffer").resume(); 
     112    if (hasWindows) 
     113    { 
    158114      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ; 
    159       
    160       while(lock!=0) 
    161       { 
    162         MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 
    163                              windows_[current]) ; 
    164         MPI_Win_flush(clientRank_, windows_[current]) ; 
    165       } 
    166  
    167 //      info(100)<<"Buffer locked "<<&windows_<<"  "<<current<<endl ; 
    168115      winState[current]=true ; 
    169116    } 
     117    CTimer::get("lock buffer").suspend(); 
    170118  } 
    171119 
    172120  void CClientBuffer::unlockBuffer(void) 
    173121  { 
    174     if (hasWindows) 
    175     { 
    176       long long int lock=1 ; 
    177       long long int zero=0, one=1 ; 
    178  
    179       MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)), 
    180                              windows_[current]) ; 
     122    CTimer::get("unlock buffer").resume(); 
     123    if (hasWindows) 
     124    { 
    181125      MPI_Win_unlock(clientRank_, windows_[current]) ; 
    182  
    183  //     info(100)<<"Buffer unlocked "<<&windows_<<"  "<<current<<endl ; 
    184126      winState[current]=false ; 
    185127    } 
     128    CTimer::get("unlock buffer").suspend(); 
    186129  } 
    187130 
     
    193136  bool CClientBuffer::isBufferFree(StdSize size) 
    194137  { 
    195 //    bool loop=true ; 
    196 //    while (loop)  
    197 //    { 
    198 //      lockBuffer(); 
    199 //      if (*control[current]==0) loop=false ; // attemp to read from server ? 
    200 //      else unlockBuffer() ; 
    201 //    } 
    202138   
    203139    lockBuffer(); 
     
    208144    if (size > bufferSize) 
    209145    { 
    210       // ERROR("bool CClientBuffer::isBufferFree(StdSize size)", 
    211       //      << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl); 
    212146      resizingBufferStep_=1 ; 
     147      *firstTimeLine[current]=0 ; 
    213148      newBufferSize_=size ; 
    214149      return false ; 
     
    231166      { 
    232167        resizingBufferStep_ = 1 ; 
     168        *firstTimeLine[current]=0 ; 
    233169        newBufferSize_ = (count+size)*growFactor_ ; 
    234170      }   
     
    247183      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; 
    248184      *bufferCount[current]=count ; 
    249 /*      info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 
    250               <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ; 
    251       if (!winState[current]) info(40)<<"CClientBuffer::getBuffer "<<" Windows Not Locked... "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current 
    252               <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;*/ 
    253185      return retBuffer; 
    254186    } 
     
    284216    MPI_Status status; 
    285217    int flag; 
     218     
     219    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
     220    MPI_Win_unlock(clientRank_, windows_[0]) ; 
     221 
     222    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
     223    MPI_Win_unlock(clientRank_, windows_[1]) ; 
    286224 
    287225    if (pending) 
     
    299237      if (count > 0) 
    300238      { 
    301         lockBuffer() ; 
    302  //       if (*control[current]==0 && bufferCount[current] > 0) 
    303         if (*bufferCount[current] > 0) 
     239        double time=MPI_Wtime() ; 
     240        if (time - lastCheckedWithNothing_ > latency_) 
    304241        { 
    305           MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
    306           if (resizingBufferStep_==3) resizingBufferStep_=0 ; 
    307           pending = true; 
    308 //          *control[current]=0 ; 
    309           *firstTimeLine[current]=0 ; 
    310           *bufferCount[current]=0 ; 
    311  
    312            unlockBuffer() ; 
    313  
    314           if (current == 1) current = 0; 
    315           else current = 1; 
    316           count = 0; 
    317         } 
    318         else  
    319         { 
    320           unlockBuffer() ; 
     242          lockBuffer() ; 
     243          if (*bufferCount[current] > 0) 
     244          { 
     245            MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); 
     246            if (resizingBufferStep_==4) resizingBufferStep_=0 ; 
     247            pending = true; 
     248            *firstTimeLine[current]=0 ; 
     249            *bufferCount[current]=0 ; 
     250 
     251             unlockBuffer() ; 
     252 
     253            if (current == 1) current = 0; 
     254            else current = 1; 
     255            count = 0; 
     256          } 
     257          else  
     258          { 
     259            unlockBuffer() ; 
     260            lastCheckedWithNothing_ = time ; 
     261          } 
    321262        } 
    322263      } 
    323264      else 
    324265      { 
    325         if (resizingBufferStep_==2) resizeBuffer(newBufferSize_) ; 
    326266        if (resizingBufferStep_==1) resizeBufferNotify() ; 
     267        else if (resizingBufferStep_==2) isNotifiedChangeBufferSize() ; 
     268        else if (resizingBufferStep_==3) resizeBuffer(newBufferSize_) ; 
    327269      } 
    328270    } 
     
    345287  void CClientBuffer::resizeBuffer(size_t newSize) 
    346288  { 
     289 
    347290    if (hasWindows) 
    348291    {  
     
    354297 
    355298    bufferSize=newSize ; 
    356     MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; 
    357     MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; 
    358     buffer[0] = bufferHeader[0]+headerSize ; 
    359     buffer[1] = bufferHeader[1]+headerSize ; 
    360     firstTimeLine[0]=(size_t*)bufferHeader[0] ; 
    361     firstTimeLine[1]=(size_t*)bufferHeader[1] ; 
    362     bufferCount[0]=(size_t*)bufferHeader[0] +1 ; 
    363     bufferCount[1]=(size_t*)bufferHeader[1] +1 ; 
    364     control[0]=(size_t*)bufferHeader[0] +2 ; 
    365     control[1]=(size_t*)bufferHeader[1] +2 ; 
    366     finalize[0]=(size_t*)bufferHeader[0] +3 ; 
    367     finalize[1]=(size_t*)bufferHeader[1] +3 ; 
     299    MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 
     300    MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ; 
     301    buffer[0] = bufferHeader[0]+headerSize_ ; 
     302    buffer[1] = bufferHeader[1]+headerSize_ ; 
     303    firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_; 
     304    firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_; 
     305    bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ; 
     306    bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ; 
     307    control[0]=(size_t*)bufferHeader[0] + controlOffset_ ;  // control=0 => nothing ; control=1 => changeBufferSize 
     308    control[1]=(size_t*)bufferHeader[1] + controlOffset_ ; 
     309    notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ; 
     310    notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ; 
    368311 
    369312    *firstTimeLine[0]=0 ; 
     
    373316    *control[0]=0 ; 
    374317    *control[1]=0 ; 
    375     *finalize[0]=0 ; 
    376     *finalize[1]=0 ; 
     318    *notify[0] = notifyNothing_ ; 
     319    *notify[1] = notifyNothing_ ; 
    377320    winState[0]=false ; 
    378321    winState[1]=false ; 
     
    382325    {   
    383326     
    384       MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ; 
    385       MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ; 
     327      MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize_) ; 
     328      MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize_) ; 
    386329           
    387330      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
     
    402345    bufOut->put(this->getWinAddress(1)); 
    403346 
    404     resizingBufferStep_=3; 
    405     unlockBuffer() ; 
     347    resizingBufferStep_=4; 
     348    unlockBuffer() ; 
     349    info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinAddress(0)<<" winAdress[1] "<<this->getWinAddress(1)<<endl; 
    406350  } 
    407351 
     
    416360  } 
    417361 
    418   bool CClientBuffer::isNotifiedFinalized(void) 
     362  bool CClientBuffer::isNotifiedChangeBufferSize(void) 
    419363  { 
    420364    
    421365    bool ret ; 
    422366    lockBuffer() ; 
    423     ret=*finalize[current] == 1 ? true : false ; 
     367    ret=*notify[current] == notifyResizeBuffer_ ? true : false ; 
     368    if (ret)  
     369    { 
     370      *notify[current] = notifyNothing_ ; 
     371      resizingBufferStep_=3;   
     372    } 
    424373    unlockBuffer() ; 
    425374 
     
    427376  } 
    428377 
     378  bool CClientBuffer::isNotifiedFinalized(void) 
     379  { 
     380    
     381    bool ret ; 
     382    lockBuffer() ; 
     383    ret=*notify[current] == notifyFinalize_ ? true : false ; 
     384    unlockBuffer() ; 
     385 
     386    return ret; 
     387  } 
     388 
    429389} 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.hpp

    r2130 r2246  
    22#define __BUFFER_CLIENT_HPP__ 
    33 
     4#include "buffer_cs_base.hpp" 
    45#include "xios_spl.hpp" 
    56#include "buffer_out.hpp" 
     
    910namespace xios 
    1011{ 
    11   class CClientBuffer 
     12  class CClientBuffer : public CBufferClientServerBase 
    1213  { 
    1314    public: 
     
    3536       void resizeBuffer(size_t newSize) ; 
    3637       void resizeBufferNotify(void) ; 
     38       bool isNotifiedChangeBufferSize(void) ; 
    3739 
    3840 
     
    4244      size_t* bufferCount[2] ; 
    4345      size_t* control[2] ; 
    44       size_t* finalize[2] ; 
     46      size_t* notify[2] ; 
    4547      bool winState[2] ; 
    4648      int current; 
     
    6769      std::vector<MPI_Win> windows_ ; 
    6870      bool hasWindows ; 
    69       static const int headerSize=4*sizeof(size_t); 
     71 
     72      double latency_=1e-2 ; 
     73      double lastCheckedWithNothing_=0 ; 
    7074  }; 
    7175} 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.cpp

    r2130 r2246  
    22#include "exception.hpp" 
    33#include "buffer_server.hpp" 
     4#include "timer.hpp" 
    45 
    56 
     
    3132  } 
    3233 
    33 /* 
    34   void CServerBuffer::createWindows(MPI_Comm oneSidedComm) 
    35   { 
    36     MPI_Barrier(oneSidedComm) ; 
    37     MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ; 
    38     MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ; 
    39     hasWindows=true ; 
    40     updateCurrentWindows() ; 
    41     MPI_Barrier(oneSidedComm) ; 
    42   } 
    43 */ 
    44  
    45 /* 
    46   bool CServerBuffer::freeWindows() 
    47   { 
    48     if (hasWindows) 
    49     { 
    50       size_t header[3] ; 
    51       size_t& control=header[2] ; 
    52       MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows_[0]) ; 
    53       MPI_Get(&control, 1, MPI_LONG_LONG_INT, windowsRank , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ; 
    54       MPI_Win_unlock(0,windows[0]) ; 
    55       if (control==2)  // ok for free windows 
    56       { 
    57         MPI_Win_free( &(windows[0])) ; 
    58         MPI_Win_free( &(windows[1])) ; 
    59         hasWindows=false ; 
    60         return true ; 
    61       } 
    62       else return false ; 
    63     } 
    64     else return true ; 
    65   } 
    66 */ 
    6734 
    6835  bool CServerBuffer::isBufferFree(size_t count) 
     
    222189  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count) 
    223190  { 
     191    count = -1 ; 
    224192    if (!hasWindows || resizingBuffer_) return false ; 
    225  
    226      
    227     size_t header[3] ; 
    228     size_t& clientTimeline=header[0] ; 
    229     size_t& clientCount=header[1] ; 
    230     size_t& control=header[2] ; 
     193    double time=MPI_Wtime() ; 
     194    if (time-bufferFromClientTime_ < bufferFromClientLatency_ ) return false; 
     195    bufferFromClientTime_ = time ; 
     196    CTimer::get("getBufferFromClient").resume() ;    
     197    size_t clientTimeline ; 
     198    size_t clientCount ; 
    231199    bool ok=false ; 
    232200     
     
    238206     
    239207    lockBuffer();  
    240  
     208    CTimer::get("getBufferFromClient_locked").resume() ;    
    241209// lock is acquired 
    242210 
    243     MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],0), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    244     MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     211    MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     212    MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    245213    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
    246214 
    247 //    control=1 ; 
    248 //    MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    249     
    250 //    MPI_Win_unlock(windowsRank_, windows_[currentWindows]) ; 
    251     MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
    252 //    info(100)<<"getBufferFromClient : windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 
    253215    if (timeLine==clientTimeline) 
    254216    { 
    255 //      info(50)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ; 
    256   
    257 //      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
    258217      buffer=(char*)getBuffer(clientCount) ; 
    259218      count=clientCount ; 
     
    261220      clientTimeline = 0 ; 
    262221      clientCount = 0 ; 
    263 //      control=0 ; 
    264       MPI_Put(&header[0], 2, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],0) , 2, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     222      MPI_Put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     223      MPI_Put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    265224 
    266225// release lock 
    267      unlockBuffer() ; 
     226      CTimer::get("getBufferFromClient_locked").suspend() ;    
     227      unlockBuffer() ; 
    268228 
    269229      ok=true ; 
     
    283243    else 
    284244    { 
    285       //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
    286       //control=0 ; 
    287       //MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     245      count=0 ; 
    288246  
    289247 // release lock 
     248      CTimer::get("getBufferFromClient_locked").suspend() ;  
    290249      unlockBuffer() ; 
    291250    } 
    292  
     251    CTimer::get("getBufferFromClient").suspend() ;    
    293252    if (ok) return true ; 
    294253 
     
    299258  { 
    300259    if (!hasWindows) return ; 
    301  
    302     long long int lock=1 ; 
    303     long long int zero=0, one=1 ; 
    304 //    control=1 ; 
    305260    MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ; 
    306     while(lock!=0) 
    307     { 
    308       MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 
    309                            windows_[currentWindows]) ; 
    310       MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
    311     } 
    312261  } 
    313262 
     
    315264  { 
    316265    if (!hasWindows) return ; 
    317     long long int lock=1 ; 
    318     long long int zero=0, one=1 ; 
    319      
    320     MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 
    321                           windows_[currentWindows]) ; 
    322     MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;  
    323266    MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ; 
    324267  } 
     
    327270  { 
    328271    if (!hasWindows) return ; 
    329     size_t finalize=1 ; 
     272    size_t notify=notifyFinalize_ ; 
    330273    lockBuffer();  
    331274// lock is acquired 
    332     MPI_Put(&finalize, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],3*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     275    MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
    333276    unlockBuffer() ; 
    334277  } 
     278 
     279  void CServerBuffer::notifyBufferResizing(void) 
     280  { 
     281    resizingBuffer_=true ; 
     282    if (!hasWindows) return ; 
     283    size_t notify=notifyResizeBuffer_ ; 
     284    lockBuffer();  
     285// lock is acquired 
     286    MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ; 
     287    unlockBuffer() ; 
     288  } 
    335289} 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.hpp

    r2130 r2246  
    22#define __BUFFER_SERVER_HPP__ 
    33 
     4#include "buffer_cs_base.hpp" 
    45#include "xios_spl.hpp" 
    56#include "buffer.hpp" 
     
    910namespace xios 
    1011{ 
    11   class CServerBuffer 
     12  class CServerBuffer : public CBufferClientServerBase 
    1213  { 
    1314    public: 
     
    2627      void unlockBuffer(void) ; 
    2728      void notifyClientFinalize(void) ; 
    28       void notifyBufferResizing(void) { resizingBuffer_=true ;} 
     29      void notifyBufferResizing(void) ; 
    2930    private: 
    3031      char* buffer; 
     
    4041      bool hasWindows ; 
    4142      int windowsRank_ ; 
     43      double bufferFromClientLatency_=1e-1 ; 
     44      double bufferFromClientTime_ = 0; 
     45 
    4246  }; 
    4347} 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.cpp

    r2240 r2246  
    5050      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 
    5151 
    52       if (flag)  
    53       { 
    54         MPI_Intercomm_merge(interComm_,false, &interCommMerged) ; 
    55         int interCommMergedRank; 
    56         MPI_Comm_rank(interComm_, &interCommMergedRank); 
    57         MPI_Comm_rank(interCommMerged, &interCommMergedRank); 
    58         MPI_Comm_rank(intraComm, &interCommMergedRank); 
    59       } 
     52      if (flag) MPI_Intercomm_merge(interComm_,false, &interCommMerged) ; 
    6053       
    6154      if (!isAttachedModeEnabled()) 
    6255      {   
     56 
     57        CTimer::get("create Windows").resume() ; 
     58 
     59        // We create dummy pair of intercommunicator between clients and server 
     60        // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically 
     61        // We don't know the reason 
     62       
     63        MPI_Comm commSelf ; 
     64        MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 
     65        vector<MPI_Comm> dummyComm(serverSize) ; 
     66        for(int rank=0; rank<serverSize; rank++) MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &dummyComm[rank]) ; 
     67 
     68        // create windows for one-sided 
    6369        windows.resize(serverSize) ; 
    6470        MPI_Comm winComm ; 
     
    6773          windows[rank].resize(2) ; 
    6874          MPI_Comm_split(interCommMerged, rank, clientRank, &winComm); 
    69           int myRank ; 
    70           MPI_Comm_rank(winComm,&myRank); 
    7175          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][0]); 
    7276          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[rank][1]); 
    7377//       ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 
    7478//            Bug or not ?           
    75 //        MPI_Comm_free(&winComm) ; 
    76         } 
    77       } 
    78  
    79       MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 
     79//          MPI_Comm_free(&winComm) ; 
     80        } 
     81         
     82        // free dummy intercommunicator => take times ? 
     83        for(int rank=0; rank<serverSize; rank++)  MPI_Comm_free(&dummyComm[rank]) ; 
     84        MPI_Comm_free(&commSelf) ; 
     85 
     86        CTimer::get("create Windows").resume() ; 
     87     } 
    8088 
    8189      auto time=chrono::system_clock::now().time_since_epoch().count() ; 
     
    281289      } 
    282290 
     291      double lastTimeBuffersNotFree=0. ; 
     292      double time ; 
     293      bool doUnlockBuffers ; 
    283294      CTimer::get("Blocking time").resume(); 
    284295      do 
    285296      { 
    286297        areBuffersFree = true; 
    287         for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
    288         { 
    289           areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 
    290         } 
     298        doUnlockBuffers=false ; 
     299        time=MPI_Wtime() ; 
     300        if (time-lastTimeBuffersNotFree > latency_) 
     301        { 
     302          for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     303          { 
     304            areBuffersFree &= (*itBuffer)->isBufferFree(*itSize); 
     305          } 
     306          if (!areBuffersFree) 
     307          { 
     308            lastTimeBuffersNotFree = time ; 
     309            doUnlockBuffers=true ; 
     310          }           
     311        } 
     312        else areBuffersFree = false ; 
    291313 
    292314        if (!areBuffersFree) 
    293315        { 
    294           for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 
     316          if (doUnlockBuffers) for (itBuffer = bufferList.begin(); itBuffer != bufferList.end(); itBuffer++) (*itBuffer)->unlockBuffer(); 
    295317          checkBuffers(); 
    296 /*           
    297           context->server->listen(); 
    298  
    299           if (context->serverPrimServer.size()>0) 
    300           { 
    301             for (int i = 0; i < context->serverPrimServer.size(); ++i)  context->serverPrimServer[i]->listen(); 
    302  //ym           CServer::contextEventLoop(false) ; // avoid dead-lock at finalize... 
    303             context->globalEventLoop() ; 
    304           } 
    305 */ 
    306            context_->globalEventLoop() ; 
     318 
     319          context_->globalEventLoop() ; 
    307320        } 
    308321 
     
    383396        } 
    384397      }  
    385  
    386398   } 
    387399 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.hpp

    r2130 r2246  
    9494      MPI_Comm intraComm; //!< Communicator of client group 
    9595 
    96       MPI_Comm commSelf; //!< Communicator of the client alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication 
    97  
    9896      map<int,CClientBuffer*> buffers; //!< Buffers for connection to servers 
    9997 
     
    126124      CContextServer* associatedServer_ ; //!< The server associated to the pair client/server 
    127125      bool isGrowableBuffer_ = true ; 
     126 
     127      double latency_=1e-2 ; 
    128128  }; 
    129129} 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp

    r2240 r2246  
    4646    else  attachedMode=true ; 
    4747     
    48     if (flag) MPI_Comm_remote_size(interComm,&commSize); 
    49     else  MPI_Comm_size(interComm,&commSize); 
     48    int clientSize ; 
     49    if (flag) MPI_Comm_remote_size(interComm,&clientSize); 
     50    else  MPI_Comm_size(interComm,&clientSize); 
    5051 
    5152    
     
    7576    if (!isAttachedModeEnabled()) 
    7677    { 
     78      CTimer::get("create Windows").resume() ; 
     79 
    7780      MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 
    78 // create windows for one sided comm 
    79       int interCommMergedRank; 
     81 
     82      // We create dummy pair of intercommunicator between clients and server 
     83      // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically 
     84      // We don't know the reason 
     85      MPI_Comm commSelf ; 
     86      MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf) ; 
     87      vector<MPI_Comm> dummyComm(clientSize) ; 
     88      for(int rank=0; rank<clientSize ; rank++) MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &dummyComm[rank]) ; 
     89 
     90      // create windows for one sided comm 
    8091      MPI_Comm winComm ; 
    81       MPI_Comm_rank(intraComm, &interCommMergedRank); 
    8292      windows.resize(2) ; 
    83       for(int rank=commSize; rank<commSize+intraCommSize; rank++) 
    84       { 
    85         if (rank==commSize+interCommMergedRank)  
     93      for(int rank=clientSize; rank<clientSize+intraCommSize; rank++) 
     94      { 
     95        if (rank==clientSize+intraCommRank)  
    8696        { 
    87           MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 
    88           int myRank ; 
    89           MPI_Comm_rank(winComm,&myRank); 
     97          MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 
    9098          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]); 
    91           MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);       
     99          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);    
    92100        } 
    93         else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm); 
    94 //       ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 
    95 //            Bug or not ?           
    96         // MPI_Comm_free(&winComm) ; 
    97       } 
     101        else MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm); 
     102        //       ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock 
     103        //            Bug or not ?           
     104        //         MPI_Comm_free(&winComm) ; 
     105      } 
     106       
     107      // free dummy intercommunicator 
     108      for(int rank=0; rank<clientSize ; rank++)  MPI_Comm_free(&dummyComm[rank]) ; 
     109      MPI_Comm_free(&commSelf) ; 
     110      CTimer::get("create Windows").suspend() ; 
    98111    } 
    99112    else  
     
    103116      windows[1]=MPI_WIN_NULL ; 
    104117    } 
    105  
    106  
    107118     
    108     MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ; 
    109119    itLastTimeLine=lastTimeLine.begin() ; 
    110120 
     
    138148  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/) 
    139149  { 
     150    CTimer::get("listen request").resume(); 
    140151    listen(); 
     152    CTimer::get("listen request").suspend(); 
     153    CTimer::get("check pending request").resume(); 
    141154    checkPendingRequest(); 
     155    checkPendingProbe() ; 
     156    CTimer::get("check pending request").suspend(); 
     157    CTimer::get("check event process").resume(); 
    142158    if (enableEventsProcessing)  processEvents(); 
     159    CTimer::get("check event process").suspend(); 
    143160    return finished; 
    144161  } 
    145  
     162/* 
    146163  void CContextServer::listen(void) 
    147164  { 
     
    221238    } 
    222239  } 
     240*/ 
     241 
     242 void CContextServer::listen(void) 
     243  { 
     244    int rank; 
     245    int flag; 
     246    int count; 
     247    char * addr; 
     248    MPI_Status status; 
     249    MPI_Message message ; 
     250    map<int,CServerBuffer*>::iterator it; 
     251    bool okLoop; 
     252 
     253    traceOff(); 
     254    MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status); 
     255    traceOn(); 
     256    if (flag==true) listenPendingRequest(message, status) ; 
     257  } 
     258 
     259  bool CContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status) 
     260  { 
     261    int count; 
     262    char * addr; 
     263    map<int,CServerBuffer*>::iterator it; 
     264    int rank=status.MPI_SOURCE ; 
     265 
     266    it=buffers.find(rank); 
     267    if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
     268    { 
     269       MPI_Aint recvBuff[4] ; 
     270       MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status); 
     271       remoteHashId_ = recvBuff[0] ; 
     272       StdSize buffSize = recvBuff[1]; 
     273       vector<MPI_Aint> winAdress(2) ; 
     274       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 
     275       mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
     276       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first; 
     277       lastTimeLine[rank]=0 ; 
     278       itLastTimeLine=lastTimeLine.begin() ; 
     279       return true; 
     280    } 
     281    else 
     282    { 
     283        std::pair<MPI_Message,MPI_Status> mypair(message,status) ; 
     284        pendingProbe[rank].push_back(mypair) ; 
     285        return false; 
     286    } 
     287  } 
     288 
     289  void CContextServer::checkPendingProbe(void) 
     290  { 
     291     
     292    list<int> recvProbe ; 
     293    list<int>::iterator itRecv ; 
     294    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe; 
     295 
     296    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++) 
     297    { 
     298      int rank=itProbe->first ; 
     299      if (pendingRequest.count(rank)==0) 
     300      { 
     301        MPI_Message& message = itProbe->second.front().first ; 
     302        MPI_Status& status = itProbe->second.front().second ; 
     303        int count ; 
     304        MPI_Get_count(&status,MPI_CHAR,&count); 
     305        map<int,CServerBuffer*>::iterator it = buffers.find(rank); 
     306        if (it->second->isBufferFree(count)) 
     307        { 
     308          char * addr; 
     309          addr=(char*)it->second->getBuffer(count); 
     310          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]); 
     311          bufferRequest[rank]=addr; 
     312          recvProbe.push_back(rank) ; 
     313          itProbe->second.pop_front() ; 
     314        } 
     315      } 
     316    } 
     317 
     318    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ; 
     319  } 
    223320 
    224321 
     
    232329    int count; 
    233330    MPI_Status status; 
     331    
     332    if (!pendingRequest.empty()) CTimer::get("receiving requests").resume(); 
     333    else CTimer::get("receiving requests").suspend(); 
    234334 
    235335    for(it=pendingRequest.begin();it!=pendingRequest.end();it++) 
     
    257357  void CContextServer::getBufferFromClient(size_t timeLine) 
    258358  { 
     359    CTimer::get("CContextServer::getBufferFromClient").resume() ; 
    259360    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 
    260361    {   
     
    267368      { 
    268369        rank=itLastTimeLine->first ; 
    269         if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0) 
     370        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 
    270371        { 
    271           if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) 
    272           { 
    273             processRequest(rank, buffer, count); 
    274             break ; 
    275           } 
     372          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 
     373          if (count >= 0) break ; 
    276374        } 
    277375      } 
    278376    } 
     377    CTimer::get("CContextServer::getBufferFromClient").suspend() ; 
    279378  } 
    280379          
     
    388487        } 
    389488      } 
    390       else getBufferFromClient(currentTimeLine) ; 
     489      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; 
    391490    } 
    392491    else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line 
     
    441540//      releaseBuffers() ; 
    442541      notifyClientsFinalize() ; 
     542      CTimer::get("receiving requests").suspend(); 
    443543      context->finalize(); 
    444544 
     
    446546      MPI_Win_free(&windows[0]) ; 
    447547      MPI_Win_free(&windows[1]) ; 
    448       
     548 
    449549      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(), 
    450550                           iteMap = mapBufferSize_.end(), itMap; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.hpp

    r2230 r2246  
    1919    bool eventLoop(bool enableEventsProcessing = true); 
    2020    void listen(void) ; 
    21     bool listenPendingRequest(MPI_Status& status) ; 
     21//    bool listenPendingRequest(MPI_Status& status) ; 
     22    bool listenPendingRequest(MPI_Message &message, MPI_Status& status) ; 
     23    void checkPendingProbe(void) ; 
    2224    void checkPendingRequest(void) ; 
    2325    void getBufferFromClient(size_t timeLine) ; 
     
    4244    MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 
    4345 
    44     MPI_Comm commSelf; //!< Communicator of the server alone. Needed to create a new communicator between 1 proc client and 1 proc server for one sided communication 
    45  
    4646    map<int,CServerBuffer*> buffers ; 
    4747    map<int,size_t> lastTimeLine ; //!< last event time line for a processed request 
    4848    map<int,size_t>::iterator itLastTimeLine ; //!< iterator on lastTimeLine 
     49    map<int, list<std::pair<MPI_Message,MPI_Status> > > pendingProbe; 
    4950    map<int,MPI_Request> pendingRequest ; 
    5051    map<int,char*> bufferRequest ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/distribution/distribution_type.hpp

    r1930 r2246  
    44namespace xios 
    55{ 
    6   enum class EDistributionType  { NONE=0, ROOT, BANDS, BLOCKS} ; 
     6  enum class EDistributionType  { NONE=0, ROOT, BANDS, BLOCKS, COLUMNS} ; 
    77} 
    88 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/contexts_manager.cpp

    r1765 r2246  
    77#include "servers_ressource.hpp" 
    88#include "server.hpp" 
     9#include "timer.hpp" 
    910#include <functional> 
    1011 
     
    4849    int serviceLeader ; 
    4950    auto servicesManager = CXios::getServicesManager() ; 
    50      
     51    
    5152    bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ; 
    5253 
     54    info(40)<<"CContextsManager::createServerContext : waiting for service leader ;  serviceId : "<<serviceId<<endl ; 
    5355    if (wait) 
    5456    { 
     
    6466      notifyType_=NOTIFY_CREATE_CONTEXT ; 
    6567      notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ; 
     68      info(40)<<"CContextsManager::createServerContext : notification create_context to service leader "<<serviceLeader<<", serviceId : "<<serviceId<<", contextId "<<contextId<<endl ; 
    6669      sendNotification(serviceLeader) ; 
    6770      return true ; 
     
    8083     
    8184    int type ; 
     85    info(40)<<"CContextsManager::createServerContextIntercomm : waiting for context leader ;  contextId : "<<contextId<<endl ; 
    8286    ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ; 
    8387    if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ; 
     
    96100      notifyType_=NOTIFY_CREATE_INTERCOMM ; 
    97101      notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ; 
     102      info(40)<<"CContextsManager::createServerContextIntercomm : notification create_intercomm to context leader : "<<contextLeader<<", contextId :"<<contextId<<endl ; 
    98103      sendNotification(contextLeader) ; 
    99104      return true ; 
     
    149154  void CContextsManager::eventLoop(void) 
    150155  { 
    151     checkNotifications() ; 
     156    CTimer::get("CContextsManager::eventLoop").resume(); 
     157    double time=MPI_Wtime() ; 
     158    if (time-lastEventLoop_ > eventLoopLatency_)  
     159    { 
     160      checkNotifications() ; 
     161      lastEventLoop_=time ; 
     162    } 
     163    CTimer::get("CContextsManager::eventLoop").suspend(); 
    152164  } 
    153165   
     
    166178  void CContextsManager::createServerContext(void) 
    167179  { 
     180    info(40)<<"CContextsManager::createServerContext : receive create server context notification"<<endl ; 
    168181    auto arg=notifyCreateContext_ ; 
    169182    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg)) 
     
    174187  void CContextsManager::createServerContextIntercomm(void) 
    175188  { 
     189    info(40)<<"CContextsManager::createServerContext : receive create intercomm context notification"<<endl ; 
    176190    auto arg=notifyCreateIntercomm_ ; 
    177191    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg)) 
     
    194208  void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo) 
    195209  { 
    196     winContexts_->lockWindow(managerGlobalLeader_,0) ; 
    197     winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 
     210    winContexts_->lockWindowExclusive(managerGlobalLeader_) ; 
     211    winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 
     212    winContexts_->flushWindow(managerGlobalLeader_) ; 
    198213    contexts_[fullContextId] = contextInfo ; 
    199     winContexts_->updateToWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ; 
    200     winContexts_->unlockWindow(managerGlobalLeader_,0) ;     
     214    winContexts_->updateToLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ; 
     215    winContexts_->unlockWindow(managerGlobalLeader_) ; 
    201216  } 
    202217 
     
    210225    { 
    211226 
    212       winContexts_->lockWindow(managerGlobalLeader_,0) ; 
    213       winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 
    214       winContexts_->unlockWindow(managerGlobalLeader_,0) ; 
     227      winContexts_->lockWindowShared(managerGlobalLeader_) ; 
     228      winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ; 
     229      winContexts_->unlockWindow(managerGlobalLeader_) ; 
    215230 
    216231      auto it=contexts_.find(fullContextId) ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/contexts_manager.hpp

    r1765 r2246  
    8282 
    8383    int managerGlobalLeader_ ; 
    84      
     84 
     85    const double eventLoopLatency_=1e-2;  
     86    double lastEventLoop_=0. ; 
     87 
    8588    friend class CWindowManager ; 
    8689  
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/daemons_manager.cpp

    r2209 r2246  
    4141  bool CDaemonsManager::eventLoop(void) 
    4242  { 
     43     
    4344    CXios::getRessourcesManager()->eventLoop() ; 
    4445    CXios::getServicesManager()->eventLoop() ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/pool_ressource.cpp

    r2208 r2246  
    66#include "type.hpp" 
    77#include "cxios.hpp" 
     8#include "timer.hpp" 
    89 
    910namespace xios 
     
    1617    MPI_Comm_rank(poolComm, &commRank) ; 
    1718    MPI_Comm_size(poolComm, &commSize) ; 
    18  
    19  
     19    info(40)<<"CPoolRessource::CPoolRessource  : creating new pool : "<<Id<<endl ; 
    2020    if (commRank==localLeader_) 
    2121    { 
     
    5151    occupancy_.erase(occupancy_.begin(),it) ; 
    5252    occupancy_.insert(procs_update.begin(),procs_update.end()) ; 
    53  
     53     
     54    info(40)<<"CPoolRessource::createService  : notify createService to all pool members ; serviceId : "<<serviceId<<endl ; 
    5455    for(int rank=0; rank<commSize; rank++) 
    5556    { 
     
    102103  bool CPoolRessource::eventLoop(bool serviceOnly) 
    103104  { 
    104     checkCreateServiceNotification() ; 
     105    CTimer::get("CPoolRessource::eventLoop").resume(); 
     106    
     107    double time=MPI_Wtime() ; 
     108    if (time-lastEventLoop_ > eventLoopLatency_)  
     109    { 
     110      checkCreateServiceNotification() ; 
     111      lastEventLoop_=time ; 
     112    } 
     113     
    105114    for (auto it=services_.begin(); it!=services_.end() ; ++it)  
    106115    { 
     
    112121      } 
    113122    } 
    114  
     123    CTimer::get("CPoolRessource::eventLoop").suspend(); 
    115124    if (services_.empty() && finalizeSignal_) return true ; 
    116125    else return false ; 
     
    137146  void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in) 
    138147  { 
     148      
     149     info(40)<<"CPoolRessource::createNewService  : receive createService notification ; serviceId : "<<serviceId<<endl ; 
    139150     MPI_Comm serviceComm, newServiceComm ; 
    140151     int commRank ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/pool_ressource.hpp

    r1764 r2246  
    4444    std::string Id_ ; 
    4545    bool finalizeSignal_ ; 
    46  
     46     
     47    const double eventLoopLatency_=1e-2;  
     48    double lastEventLoop_=0. ; 
    4749  }; 
    4850 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/ressources_manager.cpp

    r1764 r2246  
    22#include "server.hpp" 
    33#include "servers_ressource.hpp" 
     4#include "timer.hpp" 
    45 
    56 
     
    4344  void CRessourcesManager::createPool(const string& poolId, int size) 
    4445  { 
     46    info(40)<<"CRessourcesManager::createPool : calling createPool : "<<poolId<<"  of size"<<size<<endl ; 
     47    info(40)<<"send notification to leader : "<<serverLeader_<<endl ; 
    4548    winRessources_->lockWindow(managerGlobalLeader_,0) ; 
    4649    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; 
     
    4952    notifyType_=NOTIFY_CREATE_POOL ; 
    5053    notifyCreatePool_=make_tuple(poolId, size) ; 
     54    info(40)<<"CRessourcesManager::createPool : send notification creating pool to server leader "<<serverLeader_<<endl ; 
    5155    sendNotification(serverLeader_) ;  
    5256  } 
     
    6165    { 
    6266      notifyType_=NOTIFY_FINALIZE ; 
     67      info(40)<<"CRessourcesManager::finalize : send notification finalize to server leader "<<serverLeader_<<endl ; 
    6368      sendNotification(serverLeader_) ; 
    6469    }  
     
    107112  void CRessourcesManager::eventLoop(void) 
    108113  { 
    109     checkNotifications() ; 
     114    CTimer::get("CRessourcesManager::eventLoop").resume(); 
     115    double time=MPI_Wtime() ; 
     116    if (time-lastEventLoop_ > eventLoopLatency_)  
     117    { 
     118      checkNotifications() ; 
     119      lastEventLoop_=time ; 
     120    } 
     121 
     122    CTimer::get("CRessourcesManager::eventLoop").suspend(); 
    110123  } 
    111124   
     
    114127    int commRank ; 
    115128    MPI_Comm_rank(xiosComm_, &commRank) ; 
     129    CTimer::get("CRessourcesManager::checkNotifications lock").resume(); 
    116130    winNotify_->lockWindow(commRank,0) ; 
     131    CTimer::get("CRessourcesManager::checkNotifications lock").suspend(); 
     132    CTimer::get("CRessourcesManager::checkNotifications pop").resume(); 
    117133    winNotify_->popFromWindow(commRank, this, &CRessourcesManager::notificationsDumpIn) ; 
     134    CTimer::get("CRessourcesManager::checkNotifications pop").suspend(); 
     135    CTimer::get("CRessourcesManager::checkNotifications unlock").resume(); 
    118136    winNotify_->unlockWindow(commRank,0) ; 
     137    CTimer::get("CRessourcesManager::checkNotifications unlock").suspend(); 
    119138    if (notifyType_==NOTIFY_CREATE_POOL) createPool() ; 
    120139    else if (notifyType_==NOTIFY_FINALIZE) finalizeSignal() ; 
     
    123142  void CRessourcesManager::createPool(void) 
    124143  { 
     144     
    125145    auto& arg=notifyCreatePool_ ; 
    126146    string poolId=get<0>(arg) ; 
    127147    int size=get<1>(arg) ; 
     148    info(40)<<"CRessourcesManager::createPool : receive create pool notification : "<< poolId<<"  of size "<<size<<endl ; 
    128149    CServer::getServersRessource()->createPool(poolId,size) ; 
    129150  }  
     
    131152  void CRessourcesManager::finalizeSignal(void) 
    132153  { 
     154    info(40)<<"CRessourcesManager::createPool : receive finalize notification"<<endl ; 
    133155    CServer::getServersRessource()->finalize() ; 
    134156  } 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/ressources_manager.hpp

    r1764 r2246  
    7272    int freeRessourcesSize_ ; 
    7373 
     74    const double eventLoopLatency_=1e-2;  
     75    double lastEventLoop_=0. ; 
     76 
    7477    friend class CWindowManager ; 
    7578  } ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.cpp

    r2230 r2246  
    66#include "register_context_info.hpp" 
    77#include "services.hpp" 
     8#include "timer.hpp" 
    89 
    910 
     
    1819                                 hasNotification_(false) 
    1920  { 
     21   info(40)<<"CCServerContext::CServerContext  : new context creation ; contextId : "<<contextId<<endl ; 
    2022   int localRank, globalRank, commSize ; 
    2123 
     
    5860                                       const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait) 
    5961  { 
     62    info(40)<<"CServerContext::createIntercomm  : context intercomm creation ; contextId : "<<contextId<<endl ; 
    6063    int intraCommRank ; 
    6164    MPI_Comm_rank(intraComm, &intraCommRank) ; 
     
    145148     int commSize ; 
    146149     MPI_Comm_size(contextComm_,&commSize) ; 
     150     info(40)<<"CServerContext::createIntercomm  : notify createContextIntercomm to all context members ; sourceContext : "<<sourceContext<<endl ; 
     151     
    147152     for(int rank=0; rank<commSize; rank++) 
    148153     { 
     
    191196    if (!hasNotification_) 
    192197    { 
    193       int commRank ; 
    194       MPI_Comm_rank(contextComm_, &commRank) ; 
    195       winNotify_->lockWindow(commRank,0) ; 
    196       winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ; 
    197       winNotify_->unlockWindow(commRank,0) ; 
     198      double time=MPI_Wtime() ; 
     199      if (time-lastEventLoop_ > eventLoopLatency_)  
     200      { 
     201        int commRank ; 
     202        MPI_Comm_rank(contextComm_, &commRank) ; 
     203        winNotify_->lockWindow(commRank,0) ; 
     204        winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ; 
     205        winNotify_->unlockWindow(commRank,0) ; 
    198206       
    199       if (notifyInType_!= NOTIFY_NOTHING) 
    200       { 
    201         hasNotification_=true ; 
    202         auto eventScheduler=parentService_->getEventScheduler() ; 
    203         std::hash<string> hashString ; 
    204         size_t hashId = hashString(name_) ; 
    205         size_t currentTimeLine=0 ; 
    206         eventScheduler->registerEvent(currentTimeLine,hashId);  
     207        if (notifyInType_!= NOTIFY_NOTHING) 
     208        { 
     209          hasNotification_=true ; 
     210          auto eventScheduler=parentService_->getEventScheduler() ; 
     211          std::hash<string> hashString ; 
     212          size_t hashId = hashString(name_) ; 
     213          size_t currentTimeLine=0 ; 
     214          eventScheduler->registerEvent(currentTimeLine,hashId);  
     215        } 
     216        lastEventLoop_=time ; 
    207217      } 
    208218    } 
     
    225235  bool CServerContext::eventLoop(bool serviceOnly) 
    226236  { 
     237    CTimer::get("CServerContext::eventLoop").resume(); 
    227238    bool finished=false ; 
    228     if (winNotify_!=nullptr) checkNotifications() ; 
     239     
     240//    double time=MPI_Wtime() ; 
     241//    if (time-lastEventLoop_ > eventLoopLatency_)  
     242//    { 
     243      if (winNotify_!=nullptr) checkNotifications() ; 
     244//      lastEventLoop_=time ; 
     245//    } 
     246 
     247 
    229248    if (!serviceOnly && context_!=nullptr)   
    230249    { 
     
    235254      } 
    236255    } 
    237  
     256    CTimer::get("CServerContext::eventLoop").suspend(); 
    238257    if (context_==nullptr && finalizeSignal_) finished=true ; 
    239258    return finished ; 
     
    242261  void CServerContext::createIntercomm(void) 
    243262  { 
     263    info(40)<<"CServerContext::createIntercomm  : received createIntercomm notification"<<endl ; 
     264 
    244265     MPI_Comm interCommServer, interCommClient ; 
    245266     auto& arg=notifyInCreateIntercomm_ ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.hpp

    r2130 r2246  
    6161    bool isAttachedMode_ ; 
    6262 
     63    const double eventLoopLatency_=1e-2;  
     64    double lastEventLoop_=0. ; 
     65 
    6366    friend class CWindowManager ; 
    6467  } ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/servers_ressource.cpp

    r2220 r2246  
    55#include "cxios.hpp" 
    66#include "mpi.hpp" 
     7#include "timer.hpp" 
    78#include <vector> 
    89#include <string> 
     
    115116  bool CServersRessource::eventLoop(bool serviceOnly) 
    116117  { 
    117     checkNotifications() ; 
     118    CTimer::get("CServersRessource::eventLoop").resume(); 
     119    double time=MPI_Wtime() ; 
     120    if (time-lastEventLoop_ > eventLoopLatency_)  
     121    { 
     122      checkNotifications() ; 
     123      lastEventLoop_=time ; 
     124    } 
     125 
    118126    if (poolRessource_!=nullptr)  
    119127    { 
     
    124132      }  
    125133    } 
    126  
     134    CTimer::get("CServersRessource::eventLoop").suspend(); 
    127135    if (poolRessource_==nullptr && finalizeSignal_) return true ; 
    128136    else return false ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/servers_ressource.hpp

    r1764 r2246  
    5050    bool finalizeSignal_ ; 
    5151 
     52    const double eventLoopLatency_=1e-2;  
     53    double lastEventLoop_=0. ; 
     54 
    5255    friend class CWindowManager ; 
    5356  } ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.cpp

    r2230 r2246  
    55#include "server_context.hpp" 
    66#include "event_scheduler.hpp" 
     7#include "timer.hpp" 
    78 
    89namespace xios 
     
    1415 
    1516  { 
     17    info(40)<<"CService::CService  : new service created ; serviceId : "<<serviceId<<endl ; 
     18    
    1619    int localRank, globalRank, commSize ; 
    1720 
     
    4447    int commSize ; 
    4548    MPI_Comm_size(serviceComm_, &commSize) ; 
    46      
     49    info(40)<<"CService::createContext  : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ; 
     50 
    4751    for(int rank=0; rank<commSize; rank++)  
    4852    { 
     
    5155      sendNotification(rank) ; 
    5256    } 
     57    info(40)<<"CService::createContext  : notify CreateContext to all services members : DONE "<<endl ; 
    5358  } 
    5459/* 
     
    99104  { 
    100105    //checkCreateContextNotification() ; 
    101     checkNotifications() ; 
     106    CTimer::get("CService::eventLoop").resume(); 
     107     
     108//    double time=MPI_Wtime() ; 
     109//    if (time-lastEventLoop_ > eventLoopLatency_)  
     110//    { 
     111      checkNotifications() ; 
     112//      lastEventLoop_=time ; 
     113//    } 
     114 
    102115 
    103116    eventScheduler_->checkEvent() ; 
     
    111124      } ; 
    112125    } 
    113  
     126    CTimer::get("CService::eventLoop").suspend(); 
    114127    if (contexts_.empty() && finalizeSignal_) return true ; 
    115128    else return false ; 
     
    144157      if (notifyInType_==NOTIFY_CREATE_CONTEXT) 
    145158      { 
    146         info(10)<<"NotifyDumpOut"<<endl ; 
    147159        auto& arg=notifyInCreateContext_ ; 
    148160        buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg); 
     
    158170    if (!hasNotification_) 
    159171    { 
    160       int commRank ; 
    161       MPI_Comm_rank(serviceComm_, &commRank) ; 
    162       winNotify_->lockWindow(commRank,0) ; 
    163       winNotify_->popFromWindow(commRank, this, &CService::notificationsDumpIn) ; 
    164       winNotify_->unlockWindow(commRank,0) ; 
     172      double time=MPI_Wtime() ; 
     173      if (time-lastEventLoop_ > eventLoopLatency_)  
     174      { 
     175        int commRank ; 
     176        MPI_Comm_rank(serviceComm_, &commRank) ; 
     177        winNotify_->lockWindow(commRank,0) ; 
     178        winNotify_->popFromWindow(commRank, this, &CService::notificationsDumpIn) ; 
     179        winNotify_->unlockWindow(commRank,0) ; 
    165180       
    166       if (notifyInType_!= NOTIFY_NOTHING) 
    167       { 
    168         hasNotification_=true ; 
    169         std::hash<string> hashString ; 
    170         size_t hashId = hashString(name_) ; 
    171         size_t currentTimeLine=0 ; 
    172         eventScheduler_->registerEvent(currentTimeLine,hashId);  
     181        if (notifyInType_!= NOTIFY_NOTHING) 
     182        { 
     183          hasNotification_=true ; 
     184          std::hash<string> hashString ; 
     185          size_t hashId = hashString(name_) ; 
     186          size_t currentTimeLine=0 ; 
     187          info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler"<<endl ; 
     188          eventScheduler_->registerEvent(currentTimeLine,hashId);  
     189        } 
     190        lastEventLoop_=time ; 
    173191      } 
    174192    } 
     
    179197      size_t hashId = hashString(name_) ; 
    180198      size_t currentTimeLine=0 ; 
     199      info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ; 
    181200      if (eventScheduler_->queryEvent(currentTimeLine,hashId)) 
    182201      { 
    183202        eventScheduler_->popEvent() ; 
     203        info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ; 
    184204        if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ; 
    185205        hasNotification_=false ; 
     
    190210 
    191211 
    192  
     212//ym not use any more 
    193213  void CService::checkCreateContextNotification(void) 
    194214  { 
     
    210230  void CService::createContext(void) 
    211231   { 
     232     info(40)<<"CService::createContext(void)  : receive createContext notification"<<endl ; 
    212233     auto& arg=notifyInCreateContext_ ; 
    213234     string poolId = get<0>(arg) ; 
     
    218239   } 
    219240 
    220    //to remove 
     241   //to remove, not used anymore 
    221242   void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId) 
    222243   { 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.hpp

    r2130 r2246  
    7171    int nbPartitions_ ; 
    7272 
     73    const double eventLoopLatency_=1e-2;  
     74    double lastEventLoop_=0. ; 
     75 
    7376  }; 
    7477 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services_manager.cpp

    r1764 r2246  
    77#include "server.hpp" 
    88#include "servers_ressource.hpp" 
     9#include "timer.hpp" 
    910 
    1011namespace xios 
     
    5556    int poolSize ; 
    5657     
     58    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ; 
    5759    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ; 
    5860    if (wait) 
     
    6769    if (ok)  
    6870    { 
     71      info(40)<<"CServicesManager : create service notification to leader "<<leader<<", serviceId : "<<serviceId<<", size : "<<size<<endl ; 
    6972      createServicesNotify(leader, serviceId, type, size, nbPartitions) ; 
    7073      return true ; 
     
    9497    { 
    9598      auto info = notifications_.front() ; 
     99      xios::info(40)<<"CServicesManager : receive create service notification : "<<get<0>(info)<<endl ; 
    96100      CServer::getServersRessource()->getPoolRessource()->createService(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ; 
    97101      notifications_.pop_front() ; 
     
    104108  void CServicesManager::eventLoop(void) 
    105109  { 
    106     checkCreateServicesNotification() ; 
     110    CTimer::get("CServicesManager::eventLoop").resume(); 
     111    double time=MPI_Wtime() ; 
     112    if (time-lastEventLoop_ > eventLoopLatency_)  
     113    { 
     114      checkCreateServicesNotification() ; 
     115      lastEventLoop_=time ; 
     116    } 
     117    CTimer::get("CServicesManager::eventLoop").suspend(); 
    107118  } 
    108119 
     
    176187  { 
    177188     
     189    info(40)<<"CServicesManager : registering service, poolId : "<<poolId<<", serviceId : "<<serviceId<<endl ; ; 
     190 
     191    winServices_->lockWindowExclusive(managerGlobalLeader_) ; 
     192    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 
     193    winServices_->flushWindow(managerGlobalLeader_) ; 
     194    services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ; 
     195    winServices_->updateToLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ; 
     196    winServices_->unlockWindow(managerGlobalLeader_) ; 
     197 
     198/* 
    178199    winServices_->lockWindow(managerGlobalLeader_,0) ; 
    179200    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 
    180201    services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ; 
    181202    winServices_->updateToWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ; 
    182     winServices_->unlockWindow(managerGlobalLeader_,0) ; 
     203    winServices_->unlockWindow(managerGlobalLeader_,0) ;*/ 
    183204  } 
    184205 
     
    186207                                        int& size, int& nbPartitions, int& leader) 
    187208  { 
     209     
     210    winServices_->lockWindowShared(managerGlobalLeader_) ; 
     211    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 
     212    winServices_->unlockWindow(managerGlobalLeader_) ; 
     213/* 
    188214    winServices_->lockWindow(managerGlobalLeader_,0) ; 
    189215    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ; 
    190     winServices_->unlockWindow(managerGlobalLeader_,0) ; 
     216    winServices_->unlockWindow(managerGlobalLeader_,0) ;*/ 
    191217 
    192218    auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services_manager.hpp

    r1764 r2246  
    5959 
    6060    int managerGlobalLeader_ ; 
     61 
     62    const double eventLoopLatency_=1e-2;  
     63    double lastEventLoop_=0. ; 
    6164     
    6265    friend class CWindowManager ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/window_manager.hpp

    r1764 r2246  
    2525    MPI_Win window_ ; 
    2626    void * winBuffer_ ; 
     27    map<int,double> lastTimeLock_ ; 
     28    const double latency_=0e-2 ;  
    2729 
    2830    public : 
     
    4648    { 
    4749      int lock=state ; 
    48            
     50      double time ; 
     51      auto it=lastTimeLock_.find(rank) ; 
     52      if (it == lastTimeLock_.end())  
     53      {  
     54        lastTimeLock_[rank] = 0. ;  
     55        it=lastTimeLock_.find(rank) ; 
     56      } 
     57      double& lastTime = it->second ; 
     58 
    4959      do  
    5060      { 
     61        time=MPI_Wtime() ; 
     62        while(time-lastTime < latency_) time=MPI_Wtime() ; 
    5163        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
    5264        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ; 
    5365        MPI_Win_unlock(rank, window_) ; 
     66        lastTime=MPI_Wtime() ; 
    5467      } while (lock!=state) ; 
    55  
    56        
    57     } 
    58  
     68       
     69       
     70    } 
     71 
     72    void lockWindowExclusive(int rank, int state ) 
     73    { 
     74      int lock=state ; 
     75      double time ; 
     76      auto it=lastTimeLock_.find(rank) ; 
     77      if (it == lastTimeLock_.end())  
     78      {  
     79        lastTimeLock_[rank] = 0. ;  
     80        it=lastTimeLock_.find(rank) ; 
     81      } 
     82      double& lastTime = it->second ; 
     83 
     84      do  
     85      { 
     86        time=MPI_Wtime() ; 
     87        while(time-lastTime < latency_) time=MPI_Wtime() ; 
     88        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
     89        MPI_Compare_and_swap(&WINDOWS_LOCKED, &state, &lock, MPI_INT, rank, OFFSET_LOCK, window_) ; 
     90        MPI_Win_unlock(rank, window_) ; 
     91        lastTime=MPI_Wtime() ; 
     92      } while (lock!=state) ; 
     93    } 
     94 
     95    void lockWindowExclusive(int rank) 
     96    { 
     97      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
     98    } 
     99 
     100    void lockWindowShared(int rank) 
     101    { 
     102      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ; 
     103    } 
     104 
     105    void unlockWindow(int rank) 
     106    { 
     107      MPI_Win_unlock(rank, window_) ; 
     108    } 
     109 
     110    void flushWindow(int rank) 
     111    { 
     112      MPI_Win_flush(rank, window_) ; 
     113    } 
    59114 
    60115    void unlockWindow(int rank, int state ) 
     
    77132      MPI_Win_unlock(rank, window_) ; 
    78133    } 
    79      
     134 
     135    template< class T > 
     136    void updateToLockedWindow(int rank, T* object, void (T::*dumpOut)(CBufferOut&) ) 
     137    { 
     138      CBufferOut buffer ; 
     139      (object->*dumpOut)(buffer) ; 
     140      size_t size=buffer.count() ; 
     141//      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, rank, 0, window_) ; 
     142      MPI_Put(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ; 
     143      MPI_Put(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ; 
     144//      MPI_Win_unlock(rank, window_) ; 
     145    } 
     146 
    80147    template< typename T > 
    81148    void updateFromWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) )  
     
    90157      (object->*dumpIn)(buffer) ; 
    91158    } 
     159 
     160    template< typename T > 
     161    void updateFromLockedWindow(int rank, T* object, void (T::*dumpIn)(CBufferIn&) )  
     162    { 
     163      size_t size ; 
     164//      MPI_Win_lock(MPI_LOCK_SHARED, rank, 0, window_) ; 
     165      MPI_Get(&size, SIZE_BUFFER_SIZE, MPI_CHAR, rank, OFFSET_BUFFER_SIZE, SIZE_BUFFER_SIZE, MPI_CHAR, window_) ; 
     166      MPI_Win_flush(rank,window_) ; 
     167      CBufferIn buffer(size) ; 
     168      MPI_Get(buffer.start(), size, MPI_CHAR, rank,OFFSET_BUFFER, size, MPI_CHAR, window_) ; 
     169//      MPI_Win_unlock(rank, window_) ; 
     170      MPI_Win_flush(rank, window_) ; 
     171      (object->*dumpIn)(buffer) ; 
     172    } 
     173 
    92174 
    93175    template< class T > 
Note: See TracChangeset for help on using the changeset viewer.