source: trunk/LATMOS-Accounts/lib/LATMOS/Accounts/Synchro.pm @ 2419

Last change on this file since 2419 was 2411, checked in by nanardon, 4 years ago

Fix: objects not synchronised

  • Property svn:keywords set to Id Rev
File size: 15.0 KB
Line 
1package LATMOS::Accounts::Synchro;
2
3use 5.010000;
4use strict;
5use warnings;
6use base qw(Config::IniFiles);
7use LATMOS::Accounts::Bases;
8use LATMOS::Accounts::Log;
9use LATMOS::Accounts::Utils qw(exec_command);
10use Fcntl qw(:flock);
11
12=head1 NAME
13
14LATMOS::Accounts::Synchro - Perl extension for blah blah blah
15
16=head1 SYNOPSIS
17
18  use LATMOS::Accounts;
19  blah blah blah
20
21=head1 DESCRIPTION
22
23Stub documentation for LATMOS::Accounts, created by h2xs. It looks like the
24author of the extension was negligent enough to leave the stub
25unedited.
26
27Blah blah blah.
28
29=head1 FUNCTIONS
30
31=cut
32
33our $VERSION = (q$Rev: 1915 $ =~ /^Rev: (\d+) /)[0];
34
35=head2 new($from, $to, %options)
36
37Create a new synchronisation where $from and $to are LATMOS::Accounts::Base
38based objects. $to can be an array ref of objects.
39
40=cut
41
42sub new {
43    my ($class, $from, $to, %options) = @_;
44
45    my $state_file = $options{state_dir}
46        ? $options{state_dir} . '/synchronisation.ini'
47        : undef;
48    if ($state_file && ! -w $state_file) {
49        # don't exists, we have to create it
50        open(my $handle, '>', $state_file) or do {
51            la_log(LA_ERR, "Cannot open status file %s", $state_file);
52            return;
53        };
54        print $handle "[_default_]\n";
55        close($handle);
56    }
57
58    my $self = Config::IniFiles->new(
59        $state_file
60        ? (-file => $state_file)
61        : (),
62    );
63
64    if ($state_file && !$self->GetFileName) {
65        $self->SetFileName($state_file);
66    }
67   
68    $self->{from} = $from or do {
69        la_log(LA_ERR, "No database source");
70        return;
71    };
72    $self->{options} = { %options };
73
74    # allow ref and array ref of, eg
75    # to = $foo and $to = [ $foo, $bar ]
76    foreach (ref($to) eq 'ARRAY' ? @{ $to || []} : ($to)) {
77        push(@{$self->{to}}, $_);
78    }
79    bless($self, $class)
80}
81
82=head2 name
83
84Return the name of this synchronisation
85
86=cut
87
88sub name {
89    $_[0]->{options}{name}
90}
91
92=head2 from
93
94Return the base object source for this synchronisation
95
96=cut
97
98sub from {
99    my ($self) = @_;
100    return $self->{from}
101}
102
103=head2 to
104
105Return the list of base destination for this synchronisation
106
107=cut
108
109sub to {
110    my ($self) = @_;
111    return @{$self->{to} || []};
112}
113
114=head2 load_dest
115
116Try to loaded all base, return the count of filtrered base which cannot
117be loaded
118
119=cut
120
121sub load_dest {
122    my ($self) = @_;
123    my @loaded;
124    my $unloaded = 0;
125    foreach ($self->to) {
126        if($_->load) {
127            push(@loaded, $_);
128        } else {
129            $unloaded++;
130            warn "Cannot load $_";
131        }
132    }
133    $self->{to} = \@loaded;
134    return $unloaded;
135}
136
137=head2 enter_synch_mode
138
139Configure base for synchronisation
140
141=cut
142
143sub enter_synch_mode {
144    my ($self) = @_;
145    $self->from->load or return;
146    # if any cannot be loaded, return,
147    # TODO we need a way to force if some still can be sync
148    $self->load_dest and return;
149    my %state = ();
150    $state{$self->from->label} = $self->from->wexported(
151        $self->{options}{unexported} ? 1 : 0
152    );
153    foreach ($self->to) {
154        $state{$_->label} = $_->wexported(1);
155    }
156    la_log(LA_DEBUG, "Entering synch mode, old state: %s", join(', ', map {
157            "$_ => $state{$_}" } sort keys %state));
158    %state
159}
160
161=head2 leave_synch_mode (%state)
162
163Retore base to previous state
164
165=cut
166
167sub leave_synch_mode {
168    my ($self, %state) = @_;
169    la_log(LA_DEBUG, "Leaving synch mode");
170    $self->from->wexported($state{$self->from->label});
171    foreach my $base (grep { $_ } $self->to) {
172        $base->wexported($state{$base->label});
173    }
174}
175
176=head2 lock
177
178Create a lock to denied another synchronisation to run
179
180=cut
181
182sub lock {
183    my ($self) = @_;
184
185    $self->{lock}{handle} and return 1;
186    la_log(LA_DEBUG, "Trying to lock (pid $$)");
187    if ($self->{options}{state_dir}) {
188        my $lockfile = $self->{options}{state_dir} . '/synclock';
189        open(my $handle, '>>', $lockfile) or return;
190        flock($handle, LOCK_EX);
191        $self->{lock}{handle} = $handle;
192        $self->{lock}{filename} = $lockfile;
193        la_log(LA_DEBUG, "lock done (pid $$)");
194        return 1;
195    } else { return 1 }
196}
197
198=head2 unlock
199
200Remove lock
201
202=cut
203
204sub unlock {
205    my ($self) = @_;
206    if (my $handle = $self->{lock}{handle}) {
207        close($handle);
208        delete($self->{lock}{handle});
209        unlink($self->{lock}{filename});
210        delete($self->{lock}{filename});
211        return 1;
212    }
213    return;
214}
215
216=head2 sync_object ($otype, $uid, %options)
217
218Synchronise object type C<$otype> named C<$uid>
219
220=cut
221
222sub sync_object {
223    my ($self, $otype, $uid, %options) = @_;
224
225    $self->lock or return;
226
227    my %state = $self->enter_synch_mode;
228   
229    my $res = $self->_sync_object($otype, $uid, %options);
230
231    $self->leave_synch_mode(%state);
232
233    $self->unlock;
234
235    $res;
236}
237
238sub _sync_object {
239    my ($self, $otype, $uid, %options) = @_;
240    foreach ($self->to) {
241        my $res = $_->sync_object_from($self->from, $otype, $uid, %options);
242        if (defined $res) {
243            la_log(LA_NOTICE, $_->label . " $uid ($otype) $res") if ($res);
244            return 1;
245        } else {
246            la_log(LA_ERR, "error synching $uid ($otype) to " . $_->label);
247            return;
248        }
249    }
250}
251
252=head2 process
253
254Run the syncronisation
255
256=cut
257
258sub process {
259    my ($self, %options) = @_;
260
261    $self->lock or return;
262   
263    if (!(my $res = $self->run_pre_synchro({}))) {
264        la_log(LA_ERR, "Pre synchro script failed, aborting");
265        $self->unlock;
266        return;
267    }
268
269    my %state = $self->enter_synch_mode;
270
271    # tracking current base revision:
272    $self->{current_rev} = $self->from->current_rev;
273
274    my %desterror;
275    my $updated = 0;
276
277    # We do base one by one
278
279    foreach my $destbase ($self->to) {
280        my %objlist;
281        foreach my $otype ($self->from->list_supported_objects) {
282            $destbase->is_supported_object($otype) or next;
283
284            my %existings;
285            my %filtering;
286
287            $existings{$otype} = { map { $_ => 1 }
288                $self->from->listRealObjects($otype) };
289
290            # Is there a filter to apply:
291            my $filtername = 'filter.' . $destbase->label . '.' . $otype;
292            if (my $filter = $self->{options}->{$filtername}) {
293                la_log(LA_DEBUG, "Found %s param, using it: %s", $filtername, $filter);
294                $filtering{$otype} = { map { $_ => 1 }
295                    $self->from->search_objects($otype, $filter, 'oalias=NULL') };
296            } else {
297                $filtering{$otype} = $existings{$otype};
298            }
299
300
301            # deleting non existing object in dest:
302
303            # Sync: noDelete.Base = yes          mean no delete
304            #       noDelete.Base.Otype = yes    mean no delete for this object
305            if ($self->{options}->{'noDelete'} ||
306                $self->{options}->{'noDelete.' . $destbase->label} ||
307                $self->{options}->{'noDelete.' . $destbase->label . '.' . $otype}) {
308                la_log(LA_INFO, 'Not deleting object type \'%s` from base \'%s` because %s is set',
309                    $otype,
310                    $destbase->label,
311                    $self->{options}->{'noDelete.' . $destbase->label . '.' . $otype}
312                        ? 'noDelete.' . $destbase->label . '.' . $otype
313                        : $self->{options}->{'noDelete.' . $destbase->label}
314                            ? 'noDelete.' . $destbase->label
315                            : 'noDelete'
316                );
317            } else {
318
319            my $deletefiltered = 'deletefiltered.' . $destbase->label . '.' . $otype;
320
321            foreach ($destbase->listRealObjects($otype)) {
322
323                if ($filtering{$otype}{$_}) {
324                    # the object must exists
325                    next;
326                } elsif ($existings{$otype}{$_}) {
327                    # object exists but is filtered
328                    if (!$self->{options}->{$deletefiltered}) {
329                        next;
330                    }
331                }
332
333                if (my $res = $destbase->sync_object_from($self->from,
334                        $otype, $_, %options)) {
335                    la_log(LA_NOTICE, "%s::%s::%s => %s %s",
336                        $self->from->label, $otype, $_, $destbase->label, $res,
337                    );
338                    if ($destbase->is_transactionnal) {
339                        $destbase->commit;
340                    }
341                    $updated = 1;
342                } else {
343                    if ($destbase->is_transactionnal) {
344                        $destbase->rollback;
345                    }
346                }
347            }
348            } # noDelete.Base
349
350
351            # Finding object to synchronize:
352            {
353                my %ObjList = ();
354
355                # Objects to refresh
356                $ObjList{ $_ } = 1 foreach(grep { $filtering{$otype}{$_} } $self->from->list_objects_from_rev(
357                    $otype,
358                    $self->val($self->from->label, $destbase->label, 0),
359                ));
360
361                my %destExists = ( map { $_ => 1 }
362                    $destbase->listRealObjects($otype) );
363
364                # Objects missing in dest, we try to sync it
365                foreach (keys %{ $existings{ $otype } || {} }) {
366                    $destExists{ $_ } and next;
367                    $filtering{$otype}{$_} or next;
368                    $ObjList{ $_ } = 1;
369                }
370
371                @{$objlist{$otype}} = sort keys %ObjList;
372            }
373        }
374
375        my %objectok;
376        foreach my $pass (1, 0) {
377            foreach my $otype ($destbase->ordered_objects) {
378                exists($objlist{$otype}) or next;
379                foreach (@{$objlist{$otype} || []}) {
380
381                    # If first synchro is not ok, don't retry to sync the object
382                    if ((!$pass) && (!$objectok{$otype}{$_})) {
383                        next;
384                    }
385
386                    my $res = $destbase->sync_object_from($self->from, $otype, $_,
387                        %options, firstpass => $pass);
388                    if (defined $res) {
389                        if ($res) {
390                            la_log(LA_NOTICE, "%s::%s::%s => %s %s",
391                                $self->from->label, $otype, $_,
392                                $destbase->label, $res,
393                            );
394                            if ($destbase->is_transactionnal) {
395                                $destbase->commit;
396                            }
397                            $updated = 1;
398                            $objectok{$otype}{$_} = 1;
399                        }
400                    } else {
401                        la_log(LA_ERR, "Cannot synch %s::%s::%s => %s",
402                            $self->from->label, $otype, $_,
403                            $destbase->label,
404                        );
405                        $desterror{$destbase->label} = 1;
406                        if ($destbase->is_transactionnal) {
407                            $destbase->rollback;
408                        }
409                    }
410                }
411            }
412        }
413        if (!$desterror{$destbase->label}) {
414            $destbase->commit if(!$destbase->is_transactionnal);
415            $self->newval($self->from->label, $destbase->label, $self->{current_rev});
416            if(!($self->{options}{nocreate} || $self->{options}{test})) {
417                $self->write_status;
418                la_log(LA_NOTICE,
419                    "Update synch. status to %s for base %s to %s",
420                    $self->{current_rev} || '(none)', $self->from->label, $destbase->label
421                ); 
422            }
423        }
424    }
425
426    $self->leave_synch_mode(%state);
427    my $res = $self->run_post_synchro(
428        {
429            UPDATED => $updated || undef,
430        }
431    );
432    if ($res) { 
433    } else {
434        la_log(LA_ERROR, "Post synchronization script failed: %s", $res);
435    }
436
437    $self->unlock;
438
439    if (!$res) { # postscript failure
440        return;
441    } elsif (grep { $desterror{$_} } keys %desterror) {
442        # There were errors :\
443        return;
444    } else {
445        return 1;
446    }
447}
448
449=head2 write_status
450
451Write savepoint file
452
453=cut
454
455sub write_status {
456    my ($self) = @_;
457    if (my $file = $self->GetFileName) {
458        open(my $handle, '>', $file) or do {
459            la_log(LA_ERR, "Cannot open status file %s for writing: %s",
460                $file, $!);
461            return;
462        };
463        my $oldfh = select($handle);
464        $self->OutputConfig();
465        select($oldfh);
466        close($handle); 
467        return 1;
468    }
469
470    return 0;
471}
472
473=head2 reset_savepoint
474
475Reset savepoint in status file to force full synchronisation
476
477=cut
478
479sub reset_savepoint {
480    my ($self) = @_;
481    foreach my $destbase ($self->to) {
482            # don't register savepoint on error
483        $self->newval($self->from->label, $destbase->label, 0);
484    }
485    $self->write_status;
486}
487
488=head2 run_pre_synchro
489
490Run task done before synchronisation
491
492=cut
493
494sub run_pre_synchro {
495    my ($self, $env) = @_;
496
497    $env ||= {};
498    $env->{HOOK_TYPE} = 'PRE';
499
500    foreach my $base ($self->to) {
501        if ($base->config('presynchro')) {
502            la_log LA_DEBUG, "Executing base pre synchro `%s' for %s",
503                $base->config('presynchro'), $base->label;
504            exec_command(
505                $base->config('presynchro'),
506                {
507                    BASE => $base->label,
508                    BASETYPE => $base->type,
509                    %{ $env },
510                }
511            );
512        }
513    }
514
515    $self->{options}{pre} or return 1;
516
517    la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{pre});
518
519    exec_command($self->{options}{post}, $env);
520}
521
522=head2 run_post_synchro
523
524Run task done after synchronisation
525
526=cut
527
528sub run_post_synchro {
529    my ($self, $env) = @_;
530   
531    $env ||= {};
532    $env->{HOOK_TYPE} = 'PRE';
533
534    foreach my $base ($self->to) {
535        if ($base->config('postsynchro')) {
536            la_log LA_DEBUG, "Executing base post synchro `%s' for %s",
537                $base->config('postsynchro'), $base->label;
538            exec_command(
539                $base->config('postsynchro'),
540                {
541                    BASE => $base->label,
542                    BASETYPE => $base->type,
543                    %{ $env },
544                }
545            );
546        }
547    }
548
549    $self->{options}{post} or return 1;
550
551    la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{post});
552   
553    exec_command($self->{options}{post}, $env);
554}
555
556
5571;
558
559__END__
560# Below is stub documentation for your module. You'd better edit it!
561
562=head1 SEE ALSO
563
564Mention other useful documentation such as the documentation of
565related modules or operating system documentation (such as man pages
566in UNIX), or any relevant external documentation such as RFCs or
567standards.
568
569If you have a mailing list set up for your module, mention it here.
570
571If you have a web site set up for your module, mention it here.
572
573=head1 AUTHOR
574
575Thauvin Olivier, E<lt>olivier.thauvin@latmosipsl.frE<gt>
576
577=head1 COPYRIGHT AND LICENSE
578
579Copyright (C) 2009 by Thauvin Olivier
580
581This library is free software; you can redistribute it and/or modify
582it under the same terms as Perl itself, either Perl version 5.10.0 or,
583at your option, any later version of Perl 5 you may have available.
584
585
586=cut
Note: See TracBrowser for help on using the repository browser.