source: XIOS3/trunk/src/manager/window_dynamic.hpp @ 2547

Last change on this file since 2547 was 2547, checked in by ymipsl, 10 months ago

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

  • Property svn:executable set to *
File size: 6.3 KB
Line 
1#ifndef __WINDOW_DYNAMIC_HPP__
2#define __WINDOW_DYNAMIC_HPP__
3
4#include <map>
5//#include "exception.hpp"
6#include "mpi.hpp"
7#include "cxios.hpp"
8#include <iostream>
9
10namespace xios
11{
12
13
14  class CWindowDynamic
15  {
16    private:
17      void * winBuffer_ ;   
18      const MPI_Aint OFFSET_LOCK=0 ;
19      const int SIZE_LOCK=sizeof(long) ;
20      const MPI_Aint OFFSET_BUFFER =  SIZE_LOCK ;
21      const MPI_Aint OFFSET_BUFFER_SIZE = SIZE_LOCK   ;
22      MPI_Aint bufferSize_ ;
23      const double maxLatency_ = 1e-3 ; // 1ms latency maximum
24      MPI_Win window_ ;
25      MPI_Aint windowSize_ ;
26      int winCommRank_ ;
27      map<int,MPI_Aint> winBufferAddress_ ;
28
29
30    public :
31
32    void allocateBuffer(MPI_Aint size)
33    {
34      bufferSize_ = size ;
35      windowSize_ = size+OFFSET_BUFFER_SIZE ;
36      MPI_Alloc_mem(windowSize_, MPI_INFO_NULL, &winBuffer_) ;
37      MPI_Aint& lock = *((MPI_Aint*)(static_cast<char*>(winBuffer_)+OFFSET_LOCK)) ;
38      lock=0 ;
39    } 
40
41    void create(MPI_Comm winComm)
42    {
43      MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &window_);
44      CXios::getMpiGarbageCollector().registerWindow(window_) ;
45      MPI_Barrier(winComm) ;
46      MPI_Comm_rank(winComm, &winCommRank_) ;
47      MPI_Win_lock_all(0, window_) ;
48    }
49   
50    void lockAll()
51    {
52      MPI_Win_lock_all(0, window_) ;
53    }
54
55    void unlockAll()
56    {
57      MPI_Win_unlock_all(window_) ;
58    }
59
60    void setWinBufferAddress(MPI_Aint addr, int rank)
61    {
62      winBufferAddress_[rank]=addr ;
63    }
64   
65    MPI_Aint getWinBufferAddress()
66    {
67      MPI_Aint ret ;
68      MPI_Get_address(winBuffer_, &ret) ;
69      return ret ;
70    }
71
72    void* getBufferAddress()
73    {
74      return static_cast<char*>(winBuffer_)+OFFSET_BUFFER_SIZE ;
75    }
76   
77    bool tryLockExclusive(int rank)
78    {
79      long lock = 1;
80      long unlock = 0;
81      long state;
82
83      int flag ;
84      if (rank==winCommRank_) MPI_Win_sync(window_) ;
85      MPI_Compare_and_swap(&lock, &unlock, &state, MPI_LONG, rank, winBufferAddress_[rank]+OFFSET_LOCK, window_) ;
86      MPI_Win_flush(rank, window_);
87//      if (rank==winCommRank_) MPI_Win_sync(window_) ;
88      bool locked = (state == unlock) ;
89      return locked ;
90    }
91
92    bool tryLockShared(int rank, MPI_Op op)
93    {
94      long one = 0x100000000;
95      long res;
96
97      MPI_Fetch_and_op(&one, &res, MPI_LONG, rank, winBufferAddress_[rank]+OFFSET_LOCK, op, window_);
98      MPI_Win_flush(rank, window_);
99     
100      bool locked =  ! (res & 1) ;
101      return locked ;
102    }
103
104    void unlockExclusive(int rank)
105    {
106      int lock = 1;
107      int unlock = 0;
108      int state;
109     
110      if (rank==winCommRank_) MPI_Win_sync(window_) ;
111      MPI_Win_flush(rank, window_);
112      MPI_Compare_and_swap(&unlock, &lock, &state, MPI_INT, rank, winBufferAddress_[rank]+OFFSET_LOCK, window_) ;
113      MPI_Win_flush(rank, window_);
114//      if (rank==winCommRank_) MPI_Win_sync(window_) ;
115      if (lock != state) 
116      {
117        info(100)<<"Bad State : "<<((long*)winBuffer_)[0]<<endl ;
118        ERROR("CWindowBase::unlockWindowExclusive",<<"unlockWindow failed: bad state"<<endl) ; 
119      }
120    }
121
122    void unlockShared(int rank)
123    {
124      int minusone = -1;
125      int res;
126      MPI_Fetch_and_op(&minusone, &res, MPI_INT, rank, winBufferAddress_[rank]+OFFSET_LOCK+4, MPI_SUM, window_);
127      MPI_Win_flush(rank, window_);
128    }
129
130    void lockExclusive(int rank)
131    {
132      double time =  MPI_Wtime() ;
133      bool locked = tryLockExclusive(rank);
134      double lastTime = MPI_Wtime() ;
135      double delta = lastTime-time ;
136     
137      while (!locked)
138      {
139        time = MPI_Wtime() ;
140        if (delta > maxLatency_) delta = maxLatency_ ;
141        if (time >= lastTime+delta)
142        { 
143          locked = tryLockExclusive(rank);
144          delta=delta*2.;
145          lastTime = time ;     
146        }
147      } 
148    }
149
150    void lockShared(int rank)
151    {
152      double time =  MPI_Wtime() ;
153      bool locked = tryLockShared(rank, MPI_SUM);
154      double lastTime = MPI_Wtime() ;
155      double delta = lastTime-time ;
156     
157      while (!locked)
158      {
159        time = MPI_Wtime() ;
160        if (delta > maxLatency_) delta = maxLatency_ ;
161        if (time >= lastTime+delta)
162        { 
163          locked = tryLockShared(rank, MPI_NO_OP);
164          delta=delta*2.;
165          lastTime = time ;     
166        }
167      } 
168    }
169   
170    int attach(MPI_Aint size) 
171    {
172      windowSize_ = size+OFFSET_BUFFER_SIZE ;
173      MPI_Alloc_mem(windowSize_, MPI_INFO_NULL, &winBuffer_) ;
174      MPI_Aint& lock = *((MPI_Aint*)(static_cast<char*>(winBuffer_)+OFFSET_LOCK)) ;
175      lock=0 ;
176      MPI_Win_attach(window_, winBuffer_, size+OFFSET_BUFFER_SIZE) ;
177      setWinBufferAddress(getWinBufferAddress(),winCommRank_) ;
178    }
179   
180    int attach() 
181    {
182      MPI_Win_attach(window_, winBuffer_, windowSize_) ;
183      setWinBufferAddress(getWinBufferAddress(),winCommRank_) ;
184    }
185
186    int detach() 
187    {
188      MPI_Win_detach(window_, winBuffer_) ;
189      MPI_Free_mem(winBuffer_) ;       
190    }
191
192    int flush(int rank)
193    {
194      return MPI_Win_flush(rank, window_) ;
195    }
196
197    int put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp,
198            int target_count, MPI_Datatype target_datatype)
199    {
200      return MPI_Put(origin_addr, origin_count, origin_datatype, target_rank,  target_disp + OFFSET_BUFFER, target_count, target_datatype, window_) ;
201    }
202
203    int get(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp,
204            int target_count, MPI_Datatype target_datatype)
205    {
206      return MPI_Get(origin_addr, origin_count, origin_datatype, target_rank, target_disp + OFFSET_BUFFER, target_count, target_datatype, window_) ;
207    }
208
209    int compareAndSwap(const void *origin_addr, const void *compare_addr, void *result_addr, MPI_Datatype datatype,
210                       int target_rank, MPI_Aint target_disp)
211    {
212      return MPI_Compare_and_swap(origin_addr, compare_addr, result_addr, datatype, target_rank, target_disp + OFFSET_BUFFER, window_) ;
213    }
214
215    ~CWindowDynamic()
216    {
217      MPI_Win_unlock_all(window_) ;
218    }
219
220  } ;
221}
222
223
224
225#endif
Note: See TracBrowser for help on using the repository browser.