New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
svnperms.py in vendors/fcm/current/tutorial/hooks – NEMO

source: vendors/fcm/current/tutorial/hooks/svnperms.py @ 1977

Last change on this file since 1977 was 1977, checked in by flavoni, 14 years ago

importing fcm vendor

File size: 11.1 KB
Line 
1#!/usr/bin/env python
2
3# $HeadURL: https://svn.collab.net/repos/svn/trunk/tools/hook-scripts/svnperms.py $
4# $LastChangedDate: 2005-03-17 04:39:51 -0600 (Thu, 17 Mar 2005) $
5# $LastChangedBy: maxb $
6# $LastChangedRevision: 13456 $
7
8import commands
9import sys, os
10import getopt
11import re
12
13__author__ = "Gustavo Niemeyer <niemeyer@conectiva.com>"
14
15class Error(Exception): pass
16
17SECTION = re.compile(r'\[([^]]+)\]')
18OPTION = re.compile(r'(\S+)\s*=\s*(.*)$')
19
20class Config:
21    def __init__(self, filename):
22        # Options are stored in __sections_list like this:
23        # [(sectname, [(optname, optval), ...]), ...]
24        self._sections_list = []
25        self._sections_dict = {}
26        self._read(filename)
27
28    def _read(self, filename):
29        # Use the same logic as in ConfigParser.__read()
30        file = open(filename)
31        cursectdict = None
32        optname = None
33        lineno = 0
34        for line in file.xreadlines():
35            lineno = lineno + 1
36            if line.isspace() or line[0] == '#':
37                continue
38            if line[0].isspace() and cursectdict is not None and optname:
39                value = line.strip()
40                cursectdict[optname] = "%s %s" % (cursectdict[optname], value)
41                cursectlist[-1][1] = "%s %s" % (cursectlist[-1][1], value)
42            else:
43                m = SECTION.match(line)
44                if m:
45                    sectname = m.group(1)
46                    cursectdict = self._sections_dict.setdefault(sectname, {})
47                    cursectlist = []
48                    self._sections_list.append((sectname, cursectlist))
49                    optname = None
50                elif cursectdict is None:
51                    raise Error, "%s:%d: no section header" % \
52                                 (filename, lineno)
53                else:
54                    m = OPTION.match(line)
55                    if m:
56                        optname, optval = m.groups()
57                        optval = optval.strip()
58                        cursectdict[optname] = optval
59                        cursectlist.append([optname, optval])
60                    else:
61                        raise Error, "%s:%d: parsing error" % \
62                                     (filename, lineno)
63
64    def sections(self):
65        return self._sections_dict.keys()
66
67    def options(self, section):
68        return self._sections_dict.get(section, {}).keys()
69
70    def get(self, section, option, default=None):
71        return self._sections_dict.get(section, {}).get(option, default)
72
73    def walk(self, section, option=None):
74        ret = []
75        for sectname, options in self._sections_list:
76            if sectname == section:
77                for optname, value in options:
78                    if not option or optname == option:
79                        ret.append((optname, value))
80        return ret
81
82
83class Permission:
84    def __init__(self):
85        self._group = {}
86        self._permlist = []
87   
88    def parse_groups(self, groupsiter):
89        for option, value in groupsiter:
90            self._group[option] = value.split()
91           
92    def parse_perms(self, permsiter):
93        for option, value in permsiter:
94            # Paths never start with /, so remove it if provided
95            if option[0] == "/":
96                option = option[1:]
97            pattern = re.compile("^%s$" % option)
98            for entry in value.split():
99                openpar, closepar = entry.find("("), entry.find(")")
100                groupsusers = entry[:openpar].split(",")
101                perms = entry[openpar+1:closepar].split(",")
102                users = []
103                for groupuser in groupsusers:
104                    if groupuser[0] == "@":
105                        try:
106                            users.extend(self._group[groupuser[1:]])
107                        except KeyError:
108                            raise Error, "group '%s' not found" % \
109                                         groupuser[1:]
110                    else:
111                        users.append(groupuser)
112                self._permlist.append((pattern, users, perms))
113
114    def get(self, user, path):
115        ret = []
116        for pattern, users, perms in self._permlist:
117            if pattern.match(path) and (user in users or "*" in users):
118                ret = perms
119        return ret
120
121class SVNLook:
122    def __init__(self, repospath, txn=None, rev=None):
123        self.repospath = repospath
124        self.txn = txn
125        self.rev = rev
126
127    def _execcmd(self, *cmd, **kwargs):
128        cmdstr = " ".join(cmd)
129        status, output = commands.getstatusoutput(cmdstr)
130        if status != 0:
131            sys.stderr.write(cmdstr)
132            sys.stderr.write("\n")
133            sys.stderr.write(output)
134            raise Error, "command failed: %s\n%s" % (cmdstr, output)
135        return status, output
136
137    def _execsvnlook(self, cmd, *args, **kwargs):
138        execcmd_args = ["svnlook", cmd, self.repospath]
139        self._add_txnrev(execcmd_args, kwargs)
140        execcmd_args += args
141        execcmd_kwargs = {}
142        keywords = ["show", "noerror"]
143        for key in keywords:
144            if kwargs.has_key(key):
145                execcmd_kwargs[key] = kwargs[key]
146        return self._execcmd(*execcmd_args, **execcmd_kwargs)
147
148    def _add_txnrev(self, cmd_args, received_kwargs):
149        if received_kwargs.has_key("txn"):
150            txn = received_kwargs.get("txn")
151            if txn is not None:
152                cmd_args += ["-t", txn]
153        elif self.txn is not None:
154            cmd_args += ["-t", self.txn]
155        if received_kwargs.has_key("rev"):
156            rev = received_kwargs.get("rev")
157            if rev is not None:
158                cmd_args += ["-r", rev]
159        elif self.rev is not None:
160            cmd_args += ["-r", self.rev]
161
162    def changed(self, **kwargs):
163        status, output = self._execsvnlook("changed", **kwargs)
164        if status != 0:
165            return None
166        changes = []
167        for line in output.splitlines():
168            line = line.rstrip()
169            if not line: continue
170            entry = [None, None, None]
171            changedata, changeprop, path = None, None, None
172            if line[0] != "_":
173                changedata = line[0]
174            if line[1] != " ":
175                changeprop = line[1]
176            path = line[4:]
177            changes.append((changedata, changeprop, path))
178        return changes
179
180    def author(self, **kwargs):
181        status, output = self._execsvnlook("author", **kwargs)
182        if status != 0:
183            return None
184        return output.strip()
185
186
187def check_perms(filename, section, repos, txn=None, rev=None, author=None):
188    svnlook = SVNLook(repos, txn=txn, rev=rev)
189    if author is None:
190        author = svnlook.author()
191    changes = svnlook.changed()
192    try:
193        config = Config(filename)
194    except IOError:
195        raise Error, "can't read config file "+filename
196    if not section in config.sections():
197        raise Error, "section '%s' not found in config file" % section
198    perm = Permission()
199    perm.parse_groups(config.walk("groups"))
200    perm.parse_groups(config.walk(section+" groups"))
201    perm.parse_perms(config.walk(section))
202    permerrors = []
203    for changedata, changeprop, path in changes:
204        pathperms = perm.get(author, path)
205        if changedata == "A" and "add" not in pathperms:
206            permerrors.append("you can't add "+path)
207        elif changedata == "U" and "update" not in pathperms:
208            permerrors.append("you can't update "+path)
209        elif changedata == "D" and "remove" not in pathperms:
210            permerrors.append("you can't remove "+path)
211        elif changeprop == "U" and "update" not in pathperms:
212            permerrors.append("you can't update properties of "+path)
213        #else:
214        #    print "cdata=%s cprop=%s path=%s perms=%s" % \
215        #          (str(changedata), str(changeprop), path, str(pathperms))
216    if permerrors:
217        SEC_NAME = "message"
218        OPT_NAME = "permerrors_prefix"
219        message = config.get(SEC_NAME, OPT_NAME)
220        if message is None:
221            message = config.get(" ".join((section, SEC_NAME)), OPT_NAME)
222        if message is None:
223            message = "you don't have enough permissions for this transaction:"
224        permerrors.insert(0, message)
225        raise Error, "\n".join(permerrors)
226
227
228# Command:
229
230USAGE = """\
231Usage: svnperms.py OPTIONS
232
233Options:
234    -r PATH    Use repository at PATH to check transactions
235    -t TXN     Query transaction TXN for commit information
236    -f PATH    Use PATH as configuration file (default is repository
237               path + /conf/svnperms.conf)
238    -s NAME    Use section NAME as permission section (default is
239               repository name, extracted from repository path)
240    -R REV     Query revision REV for commit information (for tests)
241    -A AUTHOR  Check commit as if AUTHOR had commited it (for tests)
242    -h         Show this message
243"""
244
245class MissingArgumentsException(Exception):
246    "Thrown when required arguments are missing."
247    pass
248
249def parse_options():
250    try:
251        opts, args = getopt.getopt(sys.argv[1:], "f:s:r:t:R:A:h", ["help"])
252    except getopt.GetoptError, e:
253        raise Error, e.msg
254    class Options: pass
255    obj = Options()
256    obj.filename = None
257    obj.section = None
258    obj.repository = None
259    obj.transaction = None
260    obj.revision = None
261    obj.author = None
262    for opt, val in opts:
263        if opt == "-f":
264            obj.filename = val
265        elif opt == "-s":
266            obj.section = val
267        elif opt == "-r":
268            obj.repository = val
269        elif opt == "-t":
270            obj.transaction = val
271        elif opt == "-R":
272            obj.revision = val
273        elif opt == "-A":
274            obj.author = val
275        elif opt in ["-h", "--help"]:
276            sys.stdout.write(USAGE)
277            sys.exit(0)
278    missingopts = []
279    if not obj.repository:
280        missingopts.append("repository")
281    if not (obj.transaction or obj.revision):
282        missingopts.append("either transaction or a revision")
283    if missingopts:
284        raise MissingArgumentsException, \
285              "missing required option(s): " + ", ".join(missingopts)
286    obj.repository = os.path.abspath(obj.repository)
287    if obj.filename is None:
288        obj.filename = os.path.join(obj.repository, "conf", "svnperms.conf")
289    if obj.section is None:
290        obj.section = os.path.basename(obj.repository)
291    if not (os.path.isdir(obj.repository) and
292            os.path.isdir(os.path.join(obj.repository, "db")) and
293            os.path.isdir(os.path.join(obj.repository, "hooks")) and
294            os.path.isfile(os.path.join(obj.repository, "format"))):
295        raise Error, "path '%s' doesn't look like a repository" % \
296                     obj.repository
297       
298    return obj
299
300def main():
301    try:
302        opts = parse_options()
303        check_perms(opts.filename, opts.section,
304                    opts.repository, opts.transaction, opts.revision,
305                    opts.author)
306    except MissingArgumentsException, e:
307        sys.stderr.write("%s\n" % str(e))
308        sys.stderr.write(USAGE)
309        sys.exit(1)
310    except Error, e:
311        sys.stderr.write("error: %s\n" % str(e))
312        sys.exit(1)
313
314if __name__ == "__main__":
315    main()
316
317# vim:et:ts=4:sw=4
Note: See TracBrowser for help on using the repository browser.