source: LATMOS-Accounts/lib/LATMOS/Accounts/Synchro.pm @ 815

Last change on this file since 815 was 815, checked in by nanardon, 14 years ago
  • use lock on base files based to ensure data are not read while another process write it
  • Property svn:keywords set to Id Rev
File size: 8.5 KB
Line 
1package LATMOS::Accounts::Synchro;
2
3use 5.010000;
4use strict;
5use warnings;
6use base qw(Config::IniFiles);
7use LATMOS::Accounts::Bases;
8use LATMOS::Accounts::Log;
9use LATMOS::Accounts::Utils qw(exec_command);
10use Fcntl qw(:flock);
11
12=head1 NAME
13
14LATMOS::Accounts::Synchro - Perl extension for blah blah blah
15
16=head1 SYNOPSIS
17
18  use LATMOS::Accounts;
19  blah blah blah
20
21=head1 DESCRIPTION
22
23Stub documentation for LATMOS::Accounts, created by h2xs. It looks like the
24author of the extension was negligent enough to leave the stub
25unedited.
26
27Blah blah blah.
28
29=head1 FUNCTIONS
30
31=cut
32
33our $VERSION = (q$Rev$ =~ /^Rev: (\d+) /)[0];
34
35=head2 new($from, $to, %options)
36
37Create a new synchronisation where $from and $to are LATMOS::Accounts::Base
38based objects. $to can be an array ref of objects.
39
40=cut
41
42sub new {
43    my ($class, $from, $to, %options) = @_;
44
45    my $state_file = $options{state_dir}
46        ? $options{state_dir} . '/synchronisation.ini'
47        : undef;
48    if ($state_file && ! -w $state_file) {
49        # don't exists, we have to create it
50        open(my $handle, '>', $state_file) or do {
51            la_log(LA_ERR, "Cannot open status file %s", $state_file);
52            return;
53        };
54        print $handle "[_default_]\n";
55        close($handle);
56    }
57
58    my $self = Config::IniFiles->new(
59        $state_file
60        ? (-file => $state_file)
61        : (),
62    );
63
64    if ($state_file && !$self->GetFileName) {
65        $self->SetFileName($state_file);
66    }
67   
68    $self->{from} = $from or do {
69        la_log(LA_ERR, "No database source");
70        return;
71    };
72    $self->{options} = { %options };
73
74    # allow ref and array ref of, eg
75    # to = $foo and $to = [ $foo, $bar ]
76    foreach (ref($to) eq 'ARRAY' ? @{ $to || []} : ($to)) {
77        push(@{$self->{to}}, $_);
78    }
79    bless($self, $class)
80}
81
82sub name {
83    $_[0]->{options}{name}
84}
85
86sub from {
87    my ($self) = @_;
88    return $self->{from}
89}
90
91sub to {
92    my ($self) = @_;
93    return @{$self->{to} || []};
94}
95
96=head2 load_dest
97
98Try to loaded all base, return the count of filtrered base which cannot
99be loaded
100
101=cut
102
103sub load_dest {
104    my ($self) = @_;
105    my @loaded;
106    my $unloaded = 0;
107    foreach ($self->to) {
108        if($_->load) {
109            push(@loaded, $_);
110        } else {
111            $unloaded++;
112            warn "Cannot load $_";
113        }
114    }
115    $self->{to} = \@loaded;
116    return $unloaded;
117}
118
119sub enter_synch_mode {
120    my ($self) = @_;
121    $self->from->load or return;
122    # if any cannot be loaded, return,
123    # TODO we need a way to force if some still can be sync
124    $self->load_dest and return;
125    my %state = ();
126    $state{$self->from->label} = $self->from->wexported(0);
127    foreach ($self->to) {
128        $state{$_->label} = $_->wexported(1);
129    }
130    la_log(LA_DEBUG, "Entering synch mode, old state: %s", join(', ', map {
131            "$_ => $state{$_}" } sort keys %state));
132    %state
133}
134
135sub leave_synch_mode {
136    my ($self, %state) = @_;
137    la_log(LA_DEBUG, "Leaving synch mode");
138    $self->from->wexported($state{$self->from->label});
139    foreach ($self->to) {
140        $_->commit;
141        $_->wexported($state{$_->label});
142    }
143    $self->run_post_synchro;
144}
145
146sub lock {
147    my ($self) = @_;
148
149    $self->{lock}{handle} and return 1;
150    la_log(LA_DEBUG, "Trying to lock (pid $$)");
151    if ($self->{options}{state_dir}) {
152        my $lockfile = $self->{options}{state_dir} . '/synclock';
153        open(my $handle, '>>', $lockfile) or return;
154        flock($handle, LOCK_EX);
155        $self->{lock}{handle} = $handle;
156        $self->{lock}{filename} = $lockfile;
157        la_log(LA_DEBUG, "lock done (pid $$)");
158        return 1;
159    } else { return 1 }
160}
161
162sub unlock {
163    my ($self) = @_;
164    if (my $handle = $self->{lock}{handle}) {
165        close($handle);
166        delete($self->{lock}{handle});
167        unlink($self->{lock}{filename});
168        delete($self->{lock}{filename});
169        return 1;
170    }
171    return;
172}
173
174sub sync_object {
175    my ($self, $otype, $uid, %options) = @_;
176
177    $self->lock or return;
178
179    my %state = $self->enter_synch_mode;
180   
181    my $res = $self->_sync_object($otype, $uid, %options);
182
183    $self->leave_synch_mode(%state);
184
185    $self->unlock;
186
187    $res;
188}
189
190sub _sync_object {
191    my ($self, $otype, $uid, %options) = @_;
192    foreach ($self->to) {
193        my $res = $_->sync_object_from($self->from, $otype, $uid, %options);
194        if (defined $res) {
195            la_log(LA_NOTICE, $_->label . " $uid ($otype) $res") if ($res);
196            return 1;
197        } else {
198            la_log(LA_ERR, "error synching $uid ($otype) to " . $_->label);
199            return;
200        }
201    }
202}
203
204sub process {
205    my ($self, %options) = @_;
206
207    $self->lock or return;
208
209    my %state = $self->enter_synch_mode;
210
211    # tracking current base revision:
212    $self->{current_rev} = $self->from->current_rev;
213
214    my %desterror;
215    my %existings;
216    foreach my $destbase ($self->to) {
217        my %objlist;
218        foreach my $otype ($self->from->list_supported_objects) {
219            $destbase->is_supported_object($otype) or next;
220
221            $existings{$otype} ||= { map { $_ => 1 }
222                $self->from->list_objects($otype) };
223
224            # deleting non existing object in dest:
225            foreach ($destbase->list_objects($otype)) {
226                if(!$existings{$otype}{$_}) {
227                    my $res = $destbase->sync_object_from($self->from, $otype, $_, %options);
228                    la_log(LA_NOTICE, "%s::%s::%s => %s %s",
229                        $self->from->label, $otype, $_, $destbase->label, $res,
230                    ) if ($res);
231                }
232            }
233
234            @{$objlist{$otype}} = $self->from->list_objects_from_rev(
235                $otype,
236                $self->val($self->from->label, $destbase->label, 0),
237            );
238        }
239        foreach my $pass (1, 0) {
240            foreach my $otype (
241                sort { $a eq 'user' ? 1 : -1 } # user in last because gidNumber needed
242                keys %objlist) {
243                next if (!$pass && !$destbase->delayed_fields($otype));
244                foreach (@{$objlist{$otype} || []}) {
245                    my $res = $destbase->sync_object_from($self->from, $otype, $_,
246                        %options, firstpass => $pass);
247                    if (defined $res) {
248                        la_log(LA_NOTICE, "%s::%s::%s => %s %s",
249                            $self->from->label, $otype, $_,
250                            $destbase->label, $res,
251                        ) if ($res);
252                    } else {
253                        la_log(LA_ERR, "Cannot synch %s::%s::%s => %s",
254                            $self->from->label, $otype, $_,
255                            $destbase->label,
256                        );
257                        $desterror{$destbase->label} = 1;
258                    }
259
260                }
261            }
262        }
263    }
264
265    $self->leave_synch_mode(%state);
266    foreach my $destbase ($self->to) {
267        # don't register checkpoint on error
268        if ($desterror{$destbase->label}) { next; }
269        $self->newval($self->from->label, $destbase->label, $self->{current_rev});
270    }
271
272    if(!($self->{options}{nocreate} ||
273            $self->{options}{test})) {
274        $self->write_status;
275    }
276
277    $self->unlock;
278
279    1;
280}
281
282sub write_status {
283    my ($self) = @_;
284    if (my $file = $self->GetFileName) {
285        open(my $handle, '>', $file) or do {
286            la_log(LA_ERR, "Cannot open status file %s for writing: %s",
287                $file, $!);
288            return;
289        };
290        my $oldfh = select($handle);
291        $self->OutputConfig();
292        select($oldfh);
293        close($handle); 
294        return 1;
295    }
296
297    return 0;
298}
299
300sub run_post_synchro {
301    my ($self) = @_;
302
303    $self->{options}{post} or return 1;
304
305    la_log(LA_INFO, "Running post synchro `%s'", $self->{options}{post});
306
307    exec_command($self->{options}{post}, $self->{options});
308}
309
310
3111;
312
313__END__
314# Below is stub documentation for your module. You'd better edit it!
315
316=head1 SEE ALSO
317
318Mention other useful documentation such as the documentation of
319related modules or operating system documentation (such as man pages
320in UNIX), or any relevant external documentation such as RFCs or
321standards.
322
323If you have a mailing list set up for your module, mention it here.
324
325If you have a web site set up for your module, mention it here.
326
327=head1 AUTHOR
328
329Thauvin Olivier, E<lt>olivier.thauvin@latmosipsl.frE<gt>
330
331=head1 COPYRIGHT AND LICENSE
332
333Copyright (C) 2009 by Thauvin Olivier
334
335This library is free software; you can redistribute it and/or modify
336it under the same terms as Perl itself, either Perl version 5.10.0 or,
337at your option, any later version of Perl 5 you may have available.
338
339
340=cut
Note: See TracBrowser for help on using the repository browser.