source: LATMOS-Accounts/lib/LATMOS/Accounts/Synchro.pm @ 861

Last change on this file since 861 was 861, checked in by nanardon, 13 years ago
  • reimport missing files from previous svn
  • Property svn:keywords set to Id Rev
File size: 10.9 KB
Line 
1package LATMOS::Accounts::Synchro;
2
3use 5.010000;
4use strict;
5use warnings;
6use base qw(Config::IniFiles);
7use LATMOS::Accounts::Bases;
8use LATMOS::Accounts::Log;
9use LATMOS::Accounts::Utils qw(exec_command);
10use Fcntl qw(:flock);
11
12=head1 NAME
13
14LATMOS::Accounts::Synchro - Perl extension for blah blah blah
15
16=head1 SYNOPSIS
17
18  use LATMOS::Accounts;
19  blah blah blah
20
21=head1 DESCRIPTION
22
23Stub documentation for LATMOS::Accounts, created by h2xs. It looks like the
24author of the extension was negligent enough to leave the stub
25unedited.
26
27Blah blah blah.
28
29=head1 FUNCTIONS
30
31=cut
32
33our $VERSION = (q$Rev$ =~ /^Rev: (\d+) /)[0];
34
35=head2 new($from, $to, %options)
36
37Create a new synchronisation where $from and $to are LATMOS::Accounts::Base
38based objects. $to can be an array ref of objects.
39
40=cut
41
42sub new {
43    my ($class, $from, $to, %options) = @_;
44
45    my $state_file = $options{state_dir}
46        ? $options{state_dir} . '/synchronisation.ini'
47        : undef;
48    if ($state_file && ! -w $state_file) {
49        # don't exists, we have to create it
50        open(my $handle, '>', $state_file) or do {
51            la_log(LA_ERR, "Cannot open status file %s", $state_file);
52            return;
53        };
54        print $handle "[_default_]\n";
55        close($handle);
56    }
57
58    my $self = Config::IniFiles->new(
59        $state_file
60        ? (-file => $state_file)
61        : (),
62    );
63
64    if ($state_file && !$self->GetFileName) {
65        $self->SetFileName($state_file);
66    }
67   
68    $self->{from} = $from or do {
69        la_log(LA_ERR, "No database source");
70        return;
71    };
72    $self->{options} = { %options };
73
74    # allow ref and array ref of, eg
75    # to = $foo and $to = [ $foo, $bar ]
76    foreach (ref($to) eq 'ARRAY' ? @{ $to || []} : ($to)) {
77        push(@{$self->{to}}, $_);
78    }
79    bless($self, $class)
80}
81
82sub name {
83    $_[0]->{options}{name}
84}
85
86sub from {
87    my ($self) = @_;
88    return $self->{from}
89}
90
91sub to {
92    my ($self) = @_;
93    return @{$self->{to} || []};
94}
95
96=head2 load_dest
97
98Try to loaded all base, return the count of filtrered base which cannot
99be loaded
100
101=cut
102
103sub load_dest {
104    my ($self) = @_;
105    my @loaded;
106    my $unloaded = 0;
107    foreach ($self->to) {
108        if($_->load) {
109            push(@loaded, $_);
110        } else {
111            $unloaded++;
112            warn "Cannot load $_";
113        }
114    }
115    $self->{to} = \@loaded;
116    return $unloaded;
117}
118
119sub enter_synch_mode {
120    my ($self) = @_;
121    $self->from->load or return;
122    # if any cannot be loaded, return,
123    # TODO we need a way to force if some still can be sync
124    $self->load_dest and return;
125    my %state = ();
126    $state{$self->from->label} = $self->from->wexported(
127        $self->{options}{unexported} ? 1 : 0
128    );
129    foreach ($self->to) {
130        $state{$_->label} = $_->wexported(1);
131    }
132    la_log(LA_DEBUG, "Entering synch mode, old state: %s", join(', ', map {
133            "$_ => $state{$_}" } sort keys %state));
134    %state
135}
136
137sub leave_synch_mode {
138    my ($self, %state) = @_;
139    la_log(LA_DEBUG, "Leaving synch mode");
140    $self->from->wexported($state{$self->from->label});
141    foreach my $base (grep { $_ } $self->to) {
142        $base->commit;
143        $base->wexported($state{$base->label});
144    }
145}
146
147sub lock {
148    my ($self) = @_;
149
150    $self->{lock}{handle} and return 1;
151    la_log(LA_DEBUG, "Trying to lock (pid $$)");
152    if ($self->{options}{state_dir}) {
153        my $lockfile = $self->{options}{state_dir} . '/synclock';
154        open(my $handle, '>>', $lockfile) or return;
155        flock($handle, LOCK_EX);
156        $self->{lock}{handle} = $handle;
157        $self->{lock}{filename} = $lockfile;
158        la_log(LA_DEBUG, "lock done (pid $$)");
159        return 1;
160    } else { return 1 }
161}
162
163sub unlock {
164    my ($self) = @_;
165    if (my $handle = $self->{lock}{handle}) {
166        close($handle);
167        delete($self->{lock}{handle});
168        unlink($self->{lock}{filename});
169        delete($self->{lock}{filename});
170        return 1;
171    }
172    return;
173}
174
175sub sync_object {
176    my ($self, $otype, $uid, %options) = @_;
177
178    $self->lock or return;
179
180    my %state = $self->enter_synch_mode;
181   
182    my $res = $self->_sync_object($otype, $uid, %options);
183
184    $self->leave_synch_mode(%state);
185
186    $self->unlock;
187
188    $res;
189}
190
191sub _sync_object {
192    my ($self, $otype, $uid, %options) = @_;
193    foreach ($self->to) {
194        my $res = $_->sync_object_from($self->from, $otype, $uid, %options);
195        if (defined $res) {
196            la_log(LA_NOTICE, $_->label . " $uid ($otype) $res") if ($res);
197            return 1;
198        } else {
199            la_log(LA_ERR, "error synching $uid ($otype) to " . $_->label);
200            return;
201        }
202    }
203}
204
205sub process {
206    my ($self, %options) = @_;
207
208    $self->lock or return;
209   
210    if (!(my $res = $self->run_pre_synchro({}))) {
211        la_log(LA_ERR, "Pre synchro script failed, aborting");
212        $self->unlock;
213        return;
214    }
215
216    my %state = $self->enter_synch_mode;
217
218    # tracking current base revision:
219    $self->{current_rev} = $self->from->current_rev;
220
221    my %desterror;
222    my %existings;
223    my $updated = 0;
224    foreach my $destbase ($self->to) {
225        my %objlist;
226        foreach my $otype ($self->from->list_supported_objects) {
227            $destbase->is_supported_object($otype) or next;
228
229            $existings{$otype} ||= { map { $_ => 1 }
230                $self->from->list_objects($otype) };
231
232            # deleting non existing object in dest:
233            foreach ($destbase->list_objects($otype)) {
234                if(!$existings{$otype}{$_}) {
235                    if (my $res = $destbase->sync_object_from($self->from,
236                            $otype, $_, %options)) {
237                        la_log(LA_NOTICE, "%s::%s::%s => %s %s",
238                            $self->from->label, $otype, $_, $destbase->label, $res,
239                        );
240                        if ($destbase->is_transactionnal) {
241                            $destbase->commit;
242                        }
243                        $updated = 1;
244                    } else {
245                        if ($destbase->is_transactionnal) {
246                            $destbase->rollback;
247                        }
248                    }
249                }
250            }
251
252            @{$objlist{$otype}} = $self->from->list_objects_from_rev(
253                $otype,
254                $self->val($self->from->label, $destbase->label, 0),
255            );
256        }
257        foreach my $pass (1, 0) {
258            foreach my $otype ($destbase->ordered_objects) {
259                exists($objlist{$otype}) or next;
260                foreach (@{$objlist{$otype} || []}) {
261                    my $res = $destbase->sync_object_from($self->from, $otype, $_,
262                        %options, firstpass => $pass);
263                    if (defined $res) {
264                        if ($res) {
265                            la_log(LA_NOTICE, "%s::%s::%s => %s %s",
266                                $self->from->label, $otype, $_,
267                                $destbase->label, $res,
268                            );
269                            if ($destbase->is_transactionnal) {
270                                $destbase->commit;
271                            }
272                            $updated = 1;
273                        }
274                    } else {
275                        la_log(LA_ERR, "Cannot synch %s::%s::%s => %s",
276                            $self->from->label, $otype, $_,
277                            $destbase->label,
278                        );
279                        $desterror{$destbase->label} = 1;
280                        if ($destbase->is_transactionnal) {
281                            $destbase->rollback;
282                        }
283                    }
284
285                }
286            }
287        }
288    }
289
290    $self->leave_synch_mode(%state);
291    my $res = $self->run_post_synchro(
292        {
293            UPDATED => $updated || undef,
294        }
295    );
296    if ($res) {
297        foreach my $destbase ($self->to) {
298            # don't register checkpoint on error
299            if ($desterror{$destbase->label}) { next; }
300            $self->newval($self->from->label, $destbase->label, $self->{current_rev});
301        }
302
303        if(!($self->{options}{nocreate} ||
304                $self->{options}{test})) {
305            $self->write_status;
306        }
307    } else {
308        la_log(LA_ERROR, "Not updating status because post script failed");
309    }
310
311    $self->unlock;
312
313    1;
314}
315
316sub write_status {
317    my ($self) = @_;
318    if (my $file = $self->GetFileName) {
319        open(my $handle, '>', $file) or do {
320            la_log(LA_ERR, "Cannot open status file %s for writing: %s",
321                $file, $!);
322            return;
323        };
324        my $oldfh = select($handle);
325        $self->OutputConfig();
326        select($oldfh);
327        close($handle); 
328        return 1;
329    }
330
331    return 0;
332}
333
334sub run_pre_synchro {
335    my ($self, $env) = @_;
336
337    $env ||= {};
338    $env->{HOOK_TYPE} = 'PRE';
339
340    foreach my $base ($self->to) {
341        if ($base->options('presynchro')) {
342            la_log LA_DEBUG, "Executing base pre synchro `%s' for %s",
343                $base->options('presynchro'), $base->label;
344            exec_command(
345                $base->options('presynchro'),
346                {
347                    BASE => $base->label,
348                    BASETYPE => $base->type,
349                    %{ $env },
350                }
351            );
352        }
353    }
354
355    $self->{options}{pre} or return 1;
356
357    la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{pre});
358
359    exec_command($self->{options}{post}, $env);
360}
361
362sub run_post_synchro {
363    my ($self, $env) = @_;
364   
365    $env ||= {};
366    $env->{HOOK_TYPE} = 'PRE';
367
368    foreach my $base ($self->to) {
369        if ($base->options('postsynchro')) {
370            la_log LA_DEBUG, "Executing base post synchro `%s' for %s",
371                $base->options('postsynchro'), $base->label;
372            exec_command(
373                $base->options('postsynchro'),
374                {
375                    BASE => $base->label,
376                    BASETYPE => $base->type,
377                    %{ $env },
378                }
379            );
380        }
381    }
382
383    $self->{options}{post} or return 1;
384
385    la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{post});
386   
387    exec_command($self->{options}{post}, $env);
388}
389
390
3911;
392
393__END__
394# Below is stub documentation for your module. You'd better edit it!
395
396=head1 SEE ALSO
397
398Mention other useful documentation such as the documentation of
399related modules or operating system documentation (such as man pages
400in UNIX), or any relevant external documentation such as RFCs or
401standards.
402
403If you have a mailing list set up for your module, mention it here.
404
405If you have a web site set up for your module, mention it here.
406
407=head1 AUTHOR
408
409Thauvin Olivier, E<lt>olivier.thauvin@latmosipsl.frE<gt>
410
411=head1 COPYRIGHT AND LICENSE
412
413Copyright (C) 2009 by Thauvin Olivier
414
415This library is free software; you can redistribute it and/or modify
416it under the same terms as Perl itself, either Perl version 5.10.0 or,
417at your option, any later version of Perl 5 you may have available.
418
419
420=cut
Note: See TracBrowser for help on using the repository browser.