New URL for NEMO forge!   http://forge.nemo-ocean.eu

Since March 2022 along with NEMO 4.2 release, the code development moved to a self-hosted GitLab.
This present forge is now archived and remained online for history.
svnperms.py in vendors/FCM-2017.10.0/sbin – NEMO

source: vendors/FCM-2017.10.0/sbin/svnperms.py @ 11668

Last change on this file since 11668 was 10672, checked in by nicolasmartin, 5 years ago

Reimport latest FCM release

File size: 13.2 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4#
5# Licensed to the Apache Software Foundation (ASF) under one
6# or more contributor license agreements.  See the NOTICE file
7# distributed with this work for additional information
8# regarding copyright ownership.  The ASF licenses this file
9# to you under the Apache License, Version 2.0 (the
10# "License"); you may not use this file except in compliance
11# with the License.  You may obtain a copy of the License at
12#
13#   http://www.apache.org/licenses/LICENSE-2.0
14#
15# Unless required by applicable law or agreed to in writing,
16# software distributed under the License is distributed on an
17# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
18# KIND, either express or implied.  See the License for the
19# specific language governing permissions and limitations
20# under the License.
21#
22#
23
24# THIS DISTRIBUTION HAS BEEN MODIFIED.
25# Original source downloaded from r1295006 at:
26# https://svn.apache.org/viewvc/subversion/trunk/tools/hook-scripts/svnperms.py
27# This version is modified to allow custom permission message per repository.
28# It also fixes the Config.get method.
29
30import sys, os
31import getopt
32import shlex
33
34try:
35  # Python >=3.0
36  from subprocess import getstatusoutput as subprocess_getstatusoutput
37except ImportError:
38  # Python <3.0
39  from commands import getstatusoutput as subprocess_getstatusoutput
40try:
41    my_getopt = getopt.gnu_getopt
42except AttributeError:
43    my_getopt = getopt.getopt
44import re
45
46__author__ = "Gustavo Niemeyer <gustavo@niemeyer.net>"
47
48class Error(Exception): pass
49
50SECTION = re.compile(r'\[([^]]+?)(?:\s+extends\s+([^]]+))?\]')
51OPTION = re.compile(r'(\S+)\s*=\s*(.*)$')
52
53class Config:
54    def __init__(self, filename):
55        # Options are stored in __sections_list like this:
56        # [(sectname, [(optname, optval), ...]), ...]
57        self._sections_list = []
58        self._sections_dict = {}
59        self._read(filename)
60
61    def _read(self, filename):
62        # Use the same logic as in ConfigParser.__read()
63        file = open(filename)
64        cursectdict = None
65        optname = None
66        lineno = 0
67        for line in file:
68            lineno = lineno + 1
69            if line.isspace() or line[0] == '#':
70                continue
71            if line[0].isspace() and cursectdict is not None and optname:
72                value = line.strip()
73                cursectdict[optname] = "%s %s" % (cursectdict[optname], value)
74                cursectlist[-1][1] = "%s %s" % (cursectlist[-1][1], value)
75            else:
76                m = SECTION.match(line)
77                if m:
78                    sectname = m.group(1)
79                    parentsectname = m.group(2)
80                    if parentsectname is None:
81                        # No parent section defined, so start a new section
82                        cursectdict = self._sections_dict.setdefault \
83                            (sectname, {})
84                        cursectlist = []
85                    else:
86                        # Copy the parent section into the new section
87                        parentsectdict = self._sections_dict.get \
88                            (parentsectname, {})
89                        cursectdict = self._sections_dict.setdefault \
90                            (sectname, parentsectdict.copy())
91                        cursectlist = self.walk(parentsectname)
92                    self._sections_list.append((sectname, cursectlist))
93                    optname = None
94                elif cursectdict is None:
95                    raise Error("%s:%d: no section header" % \
96                                 (filename, lineno))
97                else:
98                    m = OPTION.match(line)
99                    if m:
100                        optname, optval = m.groups()
101                        optval = optval.strip()
102                        cursectdict[optname] = optval
103                        cursectlist.append([optname, optval])
104                    else:
105                        raise Error("%s:%d: parsing error" % \
106                                     (filename, lineno))
107
108    def sections(self):
109        return list(self._sections_dict.keys())
110
111    def options(self, section):
112        return list(self._sections_dict.get(section, {}).keys())
113
114    def get(self, section, option, default=None):
115        return self._sections_dict.get(section, {}).get(option, default)
116
117    def walk(self, section, option=None):
118        ret = []
119        for sectname, options in self._sections_list:
120            if sectname == section:
121                for optname, value in options:
122                    if not option or optname == option:
123                        ret.append((optname, value))
124        return ret
125
126
127class Permission:
128    def __init__(self):
129        self._group = {}
130        self._permlist = []
131
132    def parse_groups(self, groupsiter):
133        for option, value in groupsiter:
134            groupusers = []
135            for token in shlex.split(value):
136                # expand nested groups in place; no forward decls
137                if token[0] == "@":
138                    try:
139                        groupusers.extend(self._group[token[1:]])
140                    except KeyError:
141                        raise Error, "group '%s' not found" % token[1:]
142                else:
143                    groupusers.append(token)
144            self._group[option] = groupusers
145
146    def parse_perms(self, permsiter):
147        for option, value in permsiter:
148            # Paths never start with /, so remove it if provided
149            if option[0] == "/":
150                option = option[1:]
151            pattern = re.compile("^%s$" % option)
152            for entry in value.split():
153                openpar, closepar = entry.find("("), entry.find(")")
154                groupsusers = entry[:openpar].split(",")
155                perms = entry[openpar+1:closepar].split(",")
156                users = []
157                for groupuser in groupsusers:
158                    if groupuser[0] == "@":
159                        try:
160                            users.extend(self._group[groupuser[1:]])
161                        except KeyError:
162                            raise Error("group '%s' not found" % \
163                                         groupuser[1:])
164                    else:
165                        users.append(groupuser)
166                self._permlist.append((pattern, users, perms))
167
168    def get(self, user, path):
169        ret = []
170        for pattern, users, perms in self._permlist:
171            if pattern.match(path) and (user in users or "*" in users):
172                ret = perms
173        return ret
174
175class SVNLook:
176    def __init__(self, repospath, txn=None, rev=None):
177        self.repospath = repospath
178        self.txn = txn
179        self.rev = rev
180
181    def _execcmd(self, *cmd, **kwargs):
182        cmdstr = " ".join(cmd)
183        status, output = subprocess_getstatusoutput(cmdstr)
184        if status != 0:
185            sys.stderr.write(cmdstr)
186            sys.stderr.write("\n")
187            sys.stderr.write(output)
188            raise Error("command failed: %s\n%s" % (cmdstr, output))
189        return status, output
190
191    def _execsvnlook(self, cmd, *args, **kwargs):
192        execcmd_args = ["svnlook", cmd, self.repospath]
193        self._add_txnrev(execcmd_args, kwargs)
194        execcmd_args += args
195        execcmd_kwargs = {}
196        keywords = ["show", "noerror"]
197        for key in keywords:
198            if key in kwargs:
199                execcmd_kwargs[key] = kwargs[key]
200        return self._execcmd(*execcmd_args, **execcmd_kwargs)
201
202    def _add_txnrev(self, cmd_args, received_kwargs):
203        if "txn" in received_kwargs:
204            txn = received_kwargs.get("txn")
205            if txn is not None:
206                cmd_args += ["-t", txn]
207        elif self.txn is not None:
208            cmd_args += ["-t", self.txn]
209        if "rev" in received_kwargs:
210            rev = received_kwargs.get("rev")
211            if rev is not None:
212                cmd_args += ["-r", rev]
213        elif self.rev is not None:
214            cmd_args += ["-r", self.rev]
215
216    def changed(self, **kwargs):
217        status, output = self._execsvnlook("changed", **kwargs)
218        if status != 0:
219            return None
220        changes = []
221        for line in output.splitlines():
222            line = line.rstrip()
223            if not line: continue
224            entry = [None, None, None]
225            changedata, changeprop, path = None, None, None
226            if line[0] != "_":
227                changedata = line[0]
228            if line[1] != " ":
229                changeprop = line[1]
230            path = line[4:]
231            changes.append((changedata, changeprop, path))
232        return changes
233
234    def author(self, **kwargs):
235        status, output = self._execsvnlook("author", **kwargs)
236        if status != 0:
237            return None
238        return output.strip()
239
240
241def check_perms(filename, section, repos, txn=None, rev=None, author=None):
242    svnlook = SVNLook(repos, txn=txn, rev=rev)
243    if author is None:
244        author = svnlook.author()
245    changes = svnlook.changed()
246    try:
247        config = Config(filename)
248    except IOError:
249        raise Error("can't read config file "+filename)
250    if not section in config.sections():
251        raise Error("section '%s' not found in config file" % section)
252    perm = Permission()
253    perm.parse_groups(config.walk("groups"))
254    perm.parse_groups(config.walk(section+" groups"))
255    perm.parse_perms(config.walk(section))
256    permerrors = []
257    for changedata, changeprop, path in changes:
258        pathperms = perm.get(author, path)
259        if changedata == "A" and "add" not in pathperms:
260            permerrors.append("you can't add "+path)
261        elif changedata == "U" and "update" not in pathperms:
262            permerrors.append("you can't update "+path)
263        elif changedata == "D" and "remove" not in pathperms:
264            permerrors.append("you can't remove "+path)
265        elif changeprop == "U" and "update" not in pathperms:
266            permerrors.append("you can't update properties of "+path)
267        #else:
268        #    print "cdata=%s cprop=%s path=%s perms=%s" % \
269        #          (str(changedata), str(changeprop), path, str(pathperms))
270    if permerrors:
271        message = config.get(" ".join((section, "message")),
272                             "permerrors_prefix")
273        if message is None:
274            message = config.get("message", "permerrors_prefix")
275        if message is None:
276            message = "you don't have enough permissions for this transaction:"
277        permerrors.insert(0, message)
278        raise Error("\n".join(permerrors))
279
280
281# Command:
282
283USAGE = """\
284Usage: svnperms.py OPTIONS
285
286Options:
287    -r PATH    Use repository at PATH to check transactions
288    -t TXN     Query transaction TXN for commit information
289    -f PATH    Use PATH as configuration file (default is repository
290               path + /conf/svnperms.conf)
291    -s NAME    Use section NAME as permission section (default is
292               repository name, extracted from repository path)
293    -R REV     Query revision REV for commit information (for tests)
294    -A AUTHOR  Check commit as if AUTHOR had committed it (for tests)
295    -h         Show this message
296"""
297
298class MissingArgumentsException(Exception):
299    "Thrown when required arguments are missing."
300    pass
301
302def parse_options():
303    try:
304        opts, args = my_getopt(sys.argv[1:], "f:s:r:t:R:A:h", ["help"])
305    except getopt.GetoptError, e:
306        raise Error(e.msg)
307    class Options: pass
308    obj = Options()
309    obj.filename = None
310    obj.section = None
311    obj.repository = None
312    obj.transaction = None
313    obj.revision = None
314    obj.author = None
315    for opt, val in opts:
316        if opt == "-f":
317            obj.filename = val
318        elif opt == "-s":
319            obj.section = val
320        elif opt == "-r":
321            obj.repository = val
322        elif opt == "-t":
323            obj.transaction = val
324        elif opt == "-R":
325            obj.revision = val
326        elif opt == "-A":
327            obj.author = val
328        elif opt in ["-h", "--help"]:
329            sys.stdout.write(USAGE)
330            sys.exit(0)
331    missingopts = []
332    if not obj.repository:
333        missingopts.append("repository")
334    if not (obj.transaction or obj.revision):
335        missingopts.append("either transaction or a revision")
336    if missingopts:
337        raise MissingArgumentsException("missing required option(s): " + ", ".join(missingopts))
338    obj.repository = os.path.abspath(obj.repository)
339    if obj.filename is None:
340        obj.filename = os.path.join(obj.repository, "conf", "svnperms.conf")
341    if obj.section is None:
342        obj.section = os.path.basename(obj.repository)
343    if not (os.path.isdir(obj.repository) and
344            os.path.isdir(os.path.join(obj.repository, "db")) and
345            os.path.isdir(os.path.join(obj.repository, "hooks")) and
346            os.path.isfile(os.path.join(obj.repository, "format"))):
347        raise Error("path '%s' doesn't look like a repository" % \
348                     obj.repository)
349
350    return obj
351
352def main():
353    try:
354        opts = parse_options()
355        check_perms(opts.filename, opts.section,
356                    opts.repository, opts.transaction, opts.revision,
357                    opts.author)
358    except MissingArgumentsException, e:
359        sys.stderr.write("%s\n" % str(e))
360        sys.stderr.write(USAGE)
361        sys.exit(1)
362    except Error, e:
363        sys.stderr.write("error: %s\n" % str(e))
364        sys.exit(1)
365
366if __name__ == "__main__":
367    main()
368
369# vim:et:ts=4:sw=4
Note: See TracBrowser for help on using the repository browser.