package LATMOS::Accounts::Synchro; use 5.010000; use strict; use warnings; use base qw(Config::IniFiles); use LATMOS::Accounts::Bases; use LATMOS::Accounts::Log; use LATMOS::Accounts::Utils qw(exec_command); use Fcntl qw(:flock); =head1 NAME LATMOS::Accounts::Synchro - Perl extension for blah blah blah =head1 SYNOPSIS use LATMOS::Accounts; blah blah blah =head1 DESCRIPTION Stub documentation for LATMOS::Accounts, created by h2xs. It looks like the author of the extension was negligent enough to leave the stub unedited. Blah blah blah. =head1 FUNCTIONS =cut our $VERSION = (q$Rev$ =~ /^Rev: (\d+) /)[0]; =head2 new($from, $to, %options) Create a new synchronisation where $from and $to are LATMOS::Accounts::Base based objects. $to can be an array ref of objects. =cut sub new { my ($class, $from, $to, %options) = @_; my $state_file = $options{state_dir} ? $options{state_dir} . '/synchronisation.ini' : undef; if ($state_file && ! -w $state_file) { # don't exists, we have to create it open(my $handle, '>', $state_file) or do { la_log(LA_ERR, "Cannot open status file %s", $state_file); return; }; print $handle "[_default_]\n"; close($handle); } my $self = Config::IniFiles->new( $state_file ? (-file => $state_file) : (), ); if ($state_file && !$self->GetFileName) { $self->SetFileName($state_file); } $self->{from} = $from or do { la_log(LA_ERR, "No database source"); return; }; $self->{options} = { %options }; # allow ref and array ref of, eg # to = $foo and $to = [ $foo, $bar ] foreach (ref($to) eq 'ARRAY' ? @{ $to || []} : ($to)) { push(@{$self->{to}}, $_); } bless($self, $class) } sub name { $_[0]->{options}{name} } sub from { my ($self) = @_; return $self->{from} } sub to { my ($self) = @_; return @{$self->{to} || []}; } =head2 load_dest Try to loaded all base, return the count of filtrered base which cannot be loaded =cut sub load_dest { my ($self) = @_; my @loaded; my $unloaded = 0; foreach ($self->to) { if($_->load) { push(@loaded, $_); } else { $unloaded++; warn "Cannot load $_"; } } $self->{to} = \@loaded; return $unloaded; } sub enter_synch_mode { my ($self) = @_; $self->from->load or return; # if any cannot be loaded, return, # TODO we need a way to force if some still can be sync $self->load_dest and return; my %state = (); $state{$self->from->label} = $self->from->wexported( $self->{options}{unexported} ? 1 : 0 ); foreach ($self->to) { $state{$_->label} = $_->wexported(1); } la_log(LA_DEBUG, "Entering synch mode, old state: %s", join(', ', map { "$_ => $state{$_}" } sort keys %state)); %state } sub leave_synch_mode { my ($self, %state) = @_; la_log(LA_DEBUG, "Leaving synch mode"); $self->from->wexported($state{$self->from->label}); foreach my $base (grep { $_ } $self->to) { $base->commit; $base->wexported($state{$base->label}); } } sub lock { my ($self) = @_; $self->{lock}{handle} and return 1; la_log(LA_DEBUG, "Trying to lock (pid $$)"); if ($self->{options}{state_dir}) { my $lockfile = $self->{options}{state_dir} . '/synclock'; open(my $handle, '>>', $lockfile) or return; flock($handle, LOCK_EX); $self->{lock}{handle} = $handle; $self->{lock}{filename} = $lockfile; la_log(LA_DEBUG, "lock done (pid $$)"); return 1; } else { return 1 } } sub unlock { my ($self) = @_; if (my $handle = $self->{lock}{handle}) { close($handle); delete($self->{lock}{handle}); unlink($self->{lock}{filename}); delete($self->{lock}{filename}); return 1; } return; } sub sync_object { my ($self, $otype, $uid, %options) = @_; $self->lock or return; my %state = $self->enter_synch_mode; my $res = $self->_sync_object($otype, $uid, %options); $self->leave_synch_mode(%state); $self->unlock; $res; } sub _sync_object { my ($self, $otype, $uid, %options) = @_; foreach ($self->to) { my $res = $_->sync_object_from($self->from, $otype, $uid, %options); if (defined $res) { la_log(LA_NOTICE, $_->label . " $uid ($otype) $res") if ($res); return 1; } else { la_log(LA_ERR, "error synching $uid ($otype) to " . $_->label); return; } } } sub process { my ($self, %options) = @_; $self->lock or return; if (!(my $res = $self->run_pre_synchro({}))) { la_log(LA_ERR, "Pre synchro script failed, aborting"); $self->unlock; return; } my %state = $self->enter_synch_mode; # tracking current base revision: $self->{current_rev} = $self->from->current_rev; my %desterror; my %existings; my $updated = 0; foreach my $destbase ($self->to) { my %objlist; foreach my $otype ($self->from->list_supported_objects) { $destbase->is_supported_object($otype) or next; $existings{$otype} ||= { map { $_ => 1 } $self->from->list_objects($otype) }; # deleting non existing object in dest: foreach ($destbase->list_objects($otype)) { if(!$existings{$otype}{$_}) { if (my $res = $destbase->sync_object_from($self->from, $otype, $_, %options)) { la_log(LA_NOTICE, "%s::%s::%s => %s %s", $self->from->label, $otype, $_, $destbase->label, $res, ); if ($destbase->is_transactionnal) { $destbase->commit; } $updated = 1; } else { if ($destbase->is_transactionnal) { $destbase->rollback; } } } } @{$objlist{$otype}} = $self->from->list_objects_from_rev( $otype, $self->val($self->from->label, $destbase->label, 0), ); } foreach my $pass (1, 0) { foreach my $otype ($destbase->ordered_objects) { exists($objlist{$otype}) or next; foreach (@{$objlist{$otype} || []}) { my $res = $destbase->sync_object_from($self->from, $otype, $_, %options, firstpass => $pass); if (defined $res) { if ($res) { la_log(LA_NOTICE, "%s::%s::%s => %s %s", $self->from->label, $otype, $_, $destbase->label, $res, ); if ($destbase->is_transactionnal) { $destbase->commit; } $updated = 1; } } else { la_log(LA_ERR, "Cannot synch %s::%s::%s => %s", $self->from->label, $otype, $_, $destbase->label, ); $desterror{$destbase->label} = 1; if ($destbase->is_transactionnal) { $destbase->rollback; } } } } } } $self->leave_synch_mode(%state); my $res = $self->run_post_synchro( { UPDATED => $updated || undef, } ); if ($res) { foreach my $destbase ($self->to) { # don't register checkpoint on error if ($desterror{$destbase->label}) { next; } $self->newval($self->from->label, $destbase->label, $self->{current_rev}); } if(!($self->{options}{nocreate} || $self->{options}{test})) { $self->write_status; } } else { la_log(LA_ERROR, "Not updating status because post script failed"); } $self->unlock; 1; } sub write_status { my ($self) = @_; if (my $file = $self->GetFileName) { open(my $handle, '>', $file) or do { la_log(LA_ERR, "Cannot open status file %s for writing: %s", $file, $!); return; }; my $oldfh = select($handle); $self->OutputConfig(); select($oldfh); close($handle); return 1; } return 0; } sub run_pre_synchro { my ($self, $env) = @_; $env ||= {}; $env->{HOOK_TYPE} = 'PRE'; foreach my $base ($self->to) { if ($base->options('presynchro')) { la_log LA_DEBUG, "Executing base pre synchro `%s' for %s", $base->options('presynchro'), $base->label; exec_command( $base->options('presynchro'), { BASE => $base->label, BASETYPE => $base->type, %{ $env }, } ); } } $self->{options}{pre} or return 1; la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{pre}); exec_command($self->{options}{post}, $env); } sub run_post_synchro { my ($self, $env) = @_; $env ||= {}; $env->{HOOK_TYPE} = 'PRE'; foreach my $base ($self->to) { if ($base->options('postsynchro')) { la_log LA_DEBUG, "Executing base post synchro `%s' for %s", $base->options('postsynchro'), $base->label; exec_command( $base->options('postsynchro'), { BASE => $base->label, BASETYPE => $base->type, %{ $env }, } ); } } $self->{options}{post} or return 1; la_log(LA_DEBUG, "Running post synchro `%s'", $self->{options}{post}); exec_command($self->{options}{post}, $env); } 1; __END__ # Below is stub documentation for your module. You'd better edit it! =head1 SEE ALSO Mention other useful documentation such as the documentation of related modules or operating system documentation (such as man pages in UNIX), or any relevant external documentation such as RFCs or standards. If you have a mailing list set up for your module, mention it here. If you have a web site set up for your module, mention it here. =head1 AUTHOR Thauvin Olivier, Eolivier.thauvin@latmosipsl.frE =head1 COPYRIGHT AND LICENSE Copyright (C) 2009 by Thauvin Olivier This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.10.0 or, at your option, any later version of Perl 5 you may have available. =cut