source: LATMOS-Accounts/lib/LATMOS/Accounts/Synchro.pm @ 354

Last change on this file since 354 was 317, checked in by nanardon, 15 years ago
  • don't cache db connection at this level
  • Property svn:keywords set to Id Rev
File size: 9.4 KB
Line 
1package LATMOS::Accounts::Synchro;
2
3use 5.010000;
4use strict;
5use warnings;
6use base qw(Config::IniFiles);
7use LATMOS::Accounts::Bases;
8use LATMOS::Accounts::Log;
9use LATMOS::Accounts::Utils qw(exec_command);
10
11=head1 NAME
12
13LATMOS::Accounts::Synchro - Perl extension for blah blah blah
14
15=head1 SYNOPSIS
16
17  use LATMOS::Accounts;
18  blah blah blah
19
20=head1 DESCRIPTION
21
22Stub documentation for LATMOS::Accounts, created by h2xs. It looks like the
23author of the extension was negligent enough to leave the stub
24unedited.
25
26Blah blah blah.
27
28=head1 FUNCTIONS
29
30=cut
31
32our $VERSION = (q$Rev$ =~ /^Rev: (\d+) /)[0];
33
34=head2 new($from, $to, %options)
35
36Create a new synchronisation where $from and $to are LATMOS::Accounts::Base
37based objects. $to can be an array ref of objects.
38
39=cut
40
41sub new {
42    my ($class, $from, $to, %options) = @_;
43
44    if ($options{state_file} && ! -w $options{state_file}) {
45        # don't exists, we have to create it
46        open(my $handle, '>', $options{state_file}) or do {
47            la_log(LA_ERR, "Cannot open status file %s", $options{state_file});
48            return;
49        };
50        print $handle "[_default_]\n";
51        close($handle);
52    }
53
54    my $self = Config::IniFiles->new(
55        $options{state_file}
56        ? (-file => $options{state_file})
57        : (),
58    );
59
60    if ($options{state_file} && !$self->GetFileName) {
61        $self->SetFileName($options{state_file});
62    }
63   
64    $self->{from} = $from or do {
65        la_log(LA_ERR, "No database source");
66        return;
67    };
68    $self->{options} = { %options };
69
70    # allow ref and array ref of, eg
71    # to = $foo and $to = [ $foo, $bar ]
72    foreach (ref($to) eq 'ARRAY' ? @{ $to || []} : ($to)) {
73        push(@{$self->{to}}, $_);
74    }
75    bless($self, $class)
76}
77
78sub name {
79    $_[0]->{options}{name}
80}
81
82sub from {
83    my ($self) = @_;
84    return $self->{from}
85}
86
87sub to {
88    my ($self) = @_;
89    return @{$self->{to} || []};
90}
91
92=head2 load_dest
93
94Try to loaded all base, return the count of filtrered base which cannot
95be loaded
96
97=cut
98
99sub load_dest {
100    my ($self) = @_;
101    my @loaded;
102    my $unloaded = 0;
103    foreach ($self->to) {
104        if($_->load) {
105            push(@loaded, $_);
106        } else {
107            $unloaded++;
108        }
109    }
110    $self->{to} = \@loaded;
111    return $unloaded;
112}
113
114sub _traverse {
115    my ($self, $callback) = @_;
116   
117    # listing existing obj one time:
118    foreach my $otype ($self->from->list_supported_objects) {
119       
120        # If no dest support $otype, we skip
121        my @obj_dest_base = grep {
122            $_->is_supported_object($otype)
123        } $self->to or next;
124
125        # loading object list one time for all
126        # TODO optimize this, using rev of objects
127
128        foreach my $destbase (@obj_dest_base) {
129            $callback->($otype, $destbase);
130        }
131    }
132}
133
134sub _traverse_update {
135    my ($self, %options) = @_;
136    $self->_traverse(
137        sub {
138            my ($otype, $destbase) = @_;
139            my @lobjfrom = $self->from->list_objects_from_rev(
140                $otype,
141                $self->val($self->from->label, $destbase->label),
142            );
143            my @common_fields = $options{attr_cb}->($self->from, $destbase, $otype) or return;
144            my %exists = map { $_ => 1 } $destbase->list_objects($otype);
145            foreach my $uid (@lobjfrom) {
146                my $sobj = $self->from->get_object($otype, $uid);
147                if (!$self->{options}{test}) {
148                    my $res = $destbase->sync_object(
149                        $sobj,
150                        nocreate => ($self->{options}{nocreate} || $options{nocreate}),
151                    );
152                    if (defined $res) {
153                        la_log(LA_INFO,
154                            "%s::%s::%s => %s (%s)",
155                            $self->from->label, $otype, $uid,
156                            $destbase->label, $res
157                        ) if ($res);
158                    } else {
159                        la_log(LA_ERR,
160                            "Error syncing %s::%s::%s => %s",
161                            $self->from->label, $otype, $uid, $destbase->label
162                        ); 
163                    }
164                }
165            }
166        }
167    );
168}
169
170sub _traverse_delete {
171    my ($self, %options) = @_;
172    $self->_traverse(
173        sub {
174            my ($otype, $destbase) = @_;
175            my %exists = map { $_ => 1 } $destbase->list_objects($otype);
176            my %srcexists = map { $_ => 1 } $self->from->list_objects($otype);
177            foreach (keys %exists) {
178                if (!$srcexists{$_}) {
179                    if ($destbase->delete_object($otype, $_)) {
180                        print "delete " . $destbase->label . '::' . $otype . '::' . "$_\n";
181                    } else {
182                        la_log(LA_ERR, "cannot delete " . $destbase->label . '::' . $otype . '::' . "$_");
183                    }
184                }
185            }
186        }
187    );
188}
189
190sub enter_synch_mode {
191    my ($self) = @_;
192    my %state = ();
193    $state{$self->from->label} = $self->from->wexported(0);
194    foreach ($self->to) {
195        $state{$_->label} = $_->wexported(1);
196    }
197    la_log(LA_DEBUG, "Entering synch mode, old state: %s", join(', ', map {
198            "$_ => $state{$_}" } sort keys %state));
199    %state
200}
201
202sub leave_synch_mode {
203    my ($self, %state) = @_;
204    la_log(LA_DEBUG, "Leaving synch mode");
205    $self->from->wexported($state{$self->from->label});
206    foreach ($self->to) {
207        $_->wexported($state{$_->label});
208    }
209}
210
211sub sync_object {
212    my ($self, $otype, $uid) = @_;
213    my %state = $self->enter_synch_mode;
214    if (my $sobj = $self->from->get_object($otype, $uid)) {
215        foreach ($self->to) {
216            if (my $res = $_->sync_object($sobj)) {
217                la_log(LA_INFO, $_->label . " $uid ($otype) $res");
218            } else {
219                la_log(LA_ERR, "error synching $uid ($otype) to " . $_->label);
220            }
221        }
222    } else {
223        foreach ($self->to) {
224            if ($_->get_object($otype, $uid)) {
225                $_->get_object($otype, $uid) or do {
226                    la_log("Cannot sync (delete) %s %s");
227                }
228            }
229        }
230    }
231    foreach ($self->to) {
232        $_->commit;
233    }
234
235    $self->run_post_synchro;
236
237    $self->leave_synch_mode(%state);
238
239    1;
240}
241
242sub process {
243    my ($self) = @_;
244
245    $self->from->load or return;
246    # if any cannot be loaded, return,
247    # TODO we need a way to force if some still can be sync
248    $self->load_dest and return;
249   
250    my %state = $self->enter_synch_mode;
251
252    # tracking current base revision:
253    $self->{current_rev} = $self->from->current_rev;
254
255    my %delayed;
256    $self->_traverse_delete;
257    $self->_traverse_update(
258        attr_cb => sub {
259            my ($from, $to, $otype) = @_;
260            my %fields = ();
261            my %delayed = map { $_ => 1 } $to->delayed_fields($otype);
262            foreach ($from->list_canonical_fields($otype, 'r')) {
263                $delayed{$_} and next;
264                $fields{$_} ||= 0; # avoid
265                $fields{$_}++;
266            }
267            foreach ($to->list_canonical_fields($otype, 'w')) {
268                $delayed{$_} and next;
269                $fields{$_} ||= 0; # avoid
270                $fields{$_}++;
271            }
272            # field having value are in both
273            grep { $fields{$_} == 2 } keys %fields;
274        },
275    );
276    $self->_traverse_update(
277        attr_cb => sub {
278            my ($from, $to, $otype) = @_;
279            my %fields = ();
280            my %delayed = map { $_ => 1 } $to->delayed_fields($otype);
281            foreach ($from->list_canonical_fields($otype, 'r')) {
282                $delayed{$_} or next;
283                $fields{$_} ||= 0; # avoid
284                $fields{$_}++;
285            }
286            foreach ($to->list_canonical_fields($otype, 'w')) {
287                $delayed{$_} or next;
288                $fields{$_} ||= 0; # avoid
289                $fields{$_}++;
290            }
291            # field having value are in both
292            grep { $fields{$_} == 2 } keys %fields;
293        },
294        nocreate => 1,
295    ) unless($self->{options}{nocreate} || $self->{options}{test});
296
297    foreach ($self->to) {
298        la_log(LA_DEBUG, "Calling commit on %s", $_->label);
299        $_->commit or next;
300        $self->newval($self->from->label, $_->label, $self->{current_rev}) if($self->{current_rev});
301    }
302
303    $self->run_post_synchro;
304
305    $self->RewriteConfig
306        if($self->GetFileName && !($self->{options}{nocreate} || $self->{options}{test}));
307   
308    $self->leave_synch_mode(%state);
309
310    1;
311}
312
313sub run_post_synchro {
314    my ($self) = @_;
315
316    $self->{options}{post} or return 1;
317
318    la_log(LA_INFO, "Running post synchro `%s'", $self->{options}{post});
319
320    exec_command($self->{options}{post}, $self->{options});
321}
322
323
3241;
325
326__END__
327# Below is stub documentation for your module. You'd better edit it!
328
329=head1 SEE ALSO
330
331Mention other useful documentation such as the documentation of
332related modules or operating system documentation (such as man pages
333in UNIX), or any relevant external documentation such as RFCs or
334standards.
335
336If you have a mailing list set up for your module, mention it here.
337
338If you have a web site set up for your module, mention it here.
339
340=head1 AUTHOR
341
342Thauvin Olivier, E<lt>olivier.thauvin@latmosipsl.frE<gt>
343
344=head1 COPYRIGHT AND LICENSE
345
346Copyright (C) 2009 by Thauvin Olivier
347
348This library is free software; you can redistribute it and/or modify
349it under the same terms as Perl itself, either Perl version 5.10.0 or,
350at your option, any later version of Perl 5 you may have available.
351
352
353=cut
Note: See TracBrowser for help on using the repository browser.