Changeset 333
- Timestamp:
- 01/23/11 23:34:43 (13 years ago)
- Location:
- server/trunk/web
- Files:
-
- 6 added
- 5 edited
- 1 moved
Legend:
- Unmodified
- Added
- Removed
-
server/trunk/web/bin/sophie_scan
r332 r333 6 6 use POSIX ":sys_wait_h"; 7 7 use Getopt::Long; 8 use Sophie::Scan; 8 9 9 10 $ENV{LC_ALL} = 'C'; 10 require Sophie::Base; 11 require Sophie::Base::RpmsPath; 11 require Sophie::Scan::RpmsPath; 12 12 13 13 GetOptions( … … 28 28 } 29 29 30 my $update = 1; 30 31 alarm($updated_inotify); 31 32 while (1) { 32 33 local $SIG{ALRM} = sub { 33 34 alarm($updated_inotify); 35 $update = 1; 34 36 }; 35 if (update_base(keys %modified_paths)) { 36 %modified_paths = (); 37 if ($update) { 38 if (update_base(keys %modified_paths)) { 39 %modified_paths = (); 40 $update = 0; 41 } 37 42 } 38 43 warn "$$ Inotify"; 39 44 $inotify = inotify_path(); 40 $inotify->poll; 45 if ($inotify) { 46 $inotify->poll and $update = 1; 47 } else { 48 sleep(300); 49 } 41 50 } 42 51 … … 44 53 45 54 my $i = Linux::Inotify2->new; 46 my $sophie = Sophie::Base->connect;47 55 48 foreach ($sophie->resultset('Paths')->get_column('path')->all) { 56 my @paths = Sophie::Scan->new->list_paths; 57 if (!@paths) { 58 return; 59 } 60 foreach (@paths) { 61 -d $_ or next; 49 62 $i->watch( 50 63 $_, … … 72 85 if (my $pid = fork()) { 73 86 } else { 74 75 87 alarm 0; 88 my $scan = Sophie::Scan->new; 89 $scan->update_meta_paths; 76 90 my @pkey; 77 91 { 78 my $sophie = Sophie::Base->connect or do { 79 die "cannot read config file\n"; 80 }; 81 @pkey = $sophie->resultset('Paths')->search( 82 path => [ @path ], 83 )->get_column('d_path_key')->all; 84 push(@pkey, $sophie->resultset('Paths')->search({ 85 updated => [ undef, 86 \[ " < now() - '24 hours'::interval"], 87 ], 88 })->get_column('d_path_key')->all); 89 my %uniq = map { $_ => 1 } @pkey; 90 @pkey = keys %uniq; 92 @pkey = $scan->paths_to_keys(@path); 93 94 push(@pkey, $scan->list_unscanned_paths); 91 95 } 92 96 93 97 exit(0) if (!@pkey); 94 98 99 foreach my $pathkey (@pkey) { 100 my $time = time; 101 my @delta = Sophie::Scan::RpmsPath 102 ->new($pathkey, Sophie::Scan->new) 103 ->find_delta; 104 while (my @d = splice(@delta, 0, 25)) { 105 my $scan = Sophie::Scan->new; 106 my $path = Sophie::Scan::RpmsPath->new($pathkey, $scan); 107 $path->update_content(@d); 108 last if (time > $time + 15 * 60); 109 } 95 110 96 97 my $NB_PAR = 1; 98 my @split; 99 my $div = @pkey / $NB_PAR; 100 101 for (my $i = 0; $i < $NB_PAR - 1; $i++) { 102 $split[$i] = [ splice(@pkey, 0, $div) ]; 111 Sophie::Scan::RpmsPath->new($pathkey, Sophie::Scan->new)->set_updated 112 if (!@delta); # update only if we finished 103 113 } 104 $split[$NB_PAR -1] = [ @pkey ];105 foreach my $job (@split) {106 @{$job || []} or next;107 108 if (fork()) {109 } else {110 111 foreach my $pathkey (@{ $job }) {112 my $time = time;113 my @delta = Sophie::Base::RpmsPath->new($pathkey, Sophie::Base->connect)114 ->find_delta;115 while (my @d = splice(@delta, 0, 25)) {116 my $path = Sophie::Base::RpmsPath->new($pathkey, Sophie::Base->connect);117 $path->update_content(@d);118 #$path->db->disconnect;119 # If it take too long time, next path120 last if (time > $time + 15 * 60);121 }122 Sophie::Base::RpmsPath->new($pathkey, Sophie::Base->connect)->set_updated123 if (!@delta); # update only if we finished124 }125 exit(0);126 }127 }128 1 while(waitpid(-1, 0) <= 0);129 114 exit(0); 130 115 } -
server/trunk/web/lib/Sophie/Base.pm
r332 r333 29 29 $connect_info->{user} = $config->{dbuser}; 30 30 $connect_info->{password} = $config->{dbpassword}; 31 $connect_info->{unsafe} = 1;32 31 } 33 $connect_info->{PrintError} = 0;34 $connect_info->{RaiseError} = 0;35 exists($connect_info->{AutoCommit}) or $connect_info->{AutoCommit} = 0;36 32 $class->SUPER::connection( 37 33 $connect_info, … … 45 41 } 46 42 47 48 49 43 1; -
server/trunk/web/lib/Sophie/Base/Result/Paths.pm
r104 r333 6 6 7 7 __PACKAGE__->table('d_path'); 8 __PACKAGE__->add_columns(qw/d_path_key path added updated /);8 __PACKAGE__->add_columns(qw/d_path_key path added updated meta_path exists/); 9 9 __PACKAGE__->set_primary_key('d_path_key'); 10 10 __PACKAGE__->add_unique_constraint('path' => [ 'path' ]); 11 #__PACKAGE__->belongs_to(media => 'Sophie::Base::Result::Medias', 'media');11 __PACKAGE__->belongs_to('MetaPaths' => 'Sophie::Base::Result::MetaPaths', 'meta_path'); 12 12 __PACKAGE__->has_many(MediasPaths => 'Sophie::Base::Result::MediasPaths', 'd_path'); 13 13 __PACKAGE__->has_many(Rpmfiles => 'Sophie::Base::Result::RpmFile', 'd_path'); -
server/trunk/web/lib/Sophie/Base/Result/Rpms.pm
r94 r333 6 6 7 7 __PACKAGE__->table('rpms'); 8 __PACKAGE__->add_columns(qw/pkgid summary description issrc name evr /);8 __PACKAGE__->add_columns(qw/pkgid summary description issrc name evr arch header/); 9 9 __PACKAGE__->set_primary_key(qw/pkgid/); 10 10 __PACKAGE__->has_many(Rpmfile => 'Sophie::Base::Result::RpmFile', 'pkgid'); -
server/trunk/web/lib/Sophie/Base/Result/SrcFiles.pm
r86 r333 7 7 __PACKAGE__->table('srcfiles'); 8 8 __PACKAGE__->add_columns(qw/pkgid count basename md5 user group linkto 9 mode fflags size class color vflags mtime nlink has_content /);9 mode fflags size class color vflags mtime nlink has_content contents/); 10 10 __PACKAGE__->set_primary_key(qw/pkgid count/); 11 11 __PACKAGE__->belongs_to(Rpms => 'Sophie::Base::Result::Rpms', 'pkgid'); -
server/trunk/web/lib/Sophie/Scan/RpmsPath.pm
r332 r333 1 package Sophie:: Base::RpmsPath;1 package Sophie::Scan::RpmsPath; 2 2 3 3 use strict; 4 4 use warnings; 5 use base qw(Sophie::Base);6 5 use Sophie::Base::Header; 7 6 use RPM4; … … 12 11 use Encode; 13 12 use Time::HiRes; 13 use DBD::Pg qw(:pg_types); 14 14 15 15 sub new { … … 21 21 sub key { $_[0]->{key} } 22 22 sub db { 23 $_[0]->{db} ->storage->dbh23 $_[0]->{db} 24 24 } 25 25 … … 27 27 my ($self) = @_; 28 28 29 my $sth = $self->db->prepare_cached( 30 q{select path from d_path where d_path_key = ?} 31 ); 32 $sth->execute($self->key); 33 my $res = $sth->fetchrow_hashref; 34 $sth->finish; 35 return $res->{path} 29 $self->db->base->resultset('Paths')->find( 30 { d_path_key => $self->key } 31 )->path; 36 32 } 37 33 … … 39 35 my ($self) = @_; 40 36 41 my $sth = $self->db->prepare_cached( 42 q{select * from rpmfiles where d_path = ?} 43 ); 44 $sth->execute($self->key); 45 $sth->fetchall_hashref([ 'filename' ]); 37 my %list; 38 foreach ($self->db->base->resultset('RpmFile')->search( 39 { d_path => $self->key } 40 )->get_column('filename')->all) { 41 $list{$_} = 1; 42 } 43 return \%list; 46 44 } 47 45 … … 95 93 @delta; 96 94 } 95 97 96 sub update_content { 98 97 my ($self, @delta) = @_; … … 117 116 sub set_exists { 118 117 my ($self, $exists) = @_; 119 $self->db-> prepare_cached(q{120 update d_path set exists = ? where d_path_key = ?121 })->execute(($exists ? 1 : 0), $self->key);118 $self->db->base->resultset('Paths')->find( 119 { d_path_key => $self->key } 120 )->update({ 'exists' => ($exists ? 1 : 0) }); 122 121 $self->db->commit; 123 122 } … … 126 125 my ($self) = @_; 127 126 warn "$$ UPD"; 128 $self->db-> prepare_cached(q{129 update d_path set updated = now() where d_path_key = ?130 })->execute($self->key);127 $self->db->base->resultset('Paths')->find( 128 { d_path_key => $self->key } 129 )->update({ 'updated' => \'now()' }); 131 130 $self->db->commit; 132 131 } 133 134 132 135 133 sub remove_rpm { 136 134 my ($self, $rpm) = @_; 137 135 warn "$$ deleting $rpm"; 138 my $remove = $self->db->prepare_cached( 139 q{ 140 DELETE FROM rpmfiles where d_path = ? and filename = ? 136 $self->db->base->storage->txn_do( 137 sub { 138 139 $self->db->base->resultset('RpmFile')->search( 140 { d_path => $self->key, filename => $rpm } 141 )->delete; 141 142 } 142 143 ); 143 for (1 .. 3) {144 if ($remove->execute($self->key, $rpm)) {145 $self->db->commit;146 return 1;147 }148 $self->db->rollback;149 }150 144 } 151 145 … … 154 148 155 149 warn "$$ adding $rpm"; 156 for (1 .. 3) { 157 if (defined(my $pkgid = $self->_add_header($rpm))) { 158 $pkgid or return; 159 my $register = $self->db->prepare_cached( 160 q{ 161 INSERT INTO rpmfiles (d_path, filename, pkgid) 162 values (?,?,?) 163 } 164 ); 165 $register->execute($self->key, $rpm, $pkgid) and do { 166 $self->db->commit; 167 return 1; 150 eval { 151 my ($pkgid, $new) = $self->db->base->storage->txn_do( 152 sub { 153 my ($pkgid, $new) = $self->_add_header($rpm); 154 if (defined($pkgid)) { 155 $pkgid or return; 156 $self->db->base->resultset('RpmFile')->create( 157 { 158 d_path => $self->key, 159 filename => $rpm, 160 pkgid => $pkgid, 161 } 162 ); 163 return $pkgid, $new; 168 164 } 169 170 } 171 $self->db->rollback; 165 }, 166 ); 167 foreach my $plugins (qw'sources') { 168 my $mod = ucfirst(lc($plugins)); 169 eval "require Sophie::Scan::RpmParser::$mod;"; 170 warn $@ if($@); 171 eval { 172 my $parser = "Sophie::Scan::RpmParser::$mod"->new($self->db); 173 $parser->run($self->path . '/' . $rpm, $pkgid, $new); 174 } 175 } 172 176 } 173 177 } … … 186 190 187 191 { 188 my $find = $self->db->prepare_cached(q{ 189 select pkgid from rpms where pkgid = ? 190 }); 191 $find->execute($header->queryformat('%{PKGID}')); 192 my $rows = $find->rows; 193 $find->finish; 194 if ($rows) { 192 my $find = $self->db->base->resultset('Rpms')->search( 193 { pkgid => $header->queryformat('%{PKGID}') } 194 )->get_column('pkgid')->all; 195 if ($find) { 195 196 warn "$$ Find"; 196 return $header->queryformat('%{PKGID}');197 return($header->queryformat('%{PKGID}'), 0); 197 198 } 198 199 } … … 204 205 while (read($tmp, my $str, 1024)) { $string .= $str } 205 206 $tmp = undef; 206 my $add_header = $self->db->prepare_cached(207 q{208 INSERT into rpms (pkgid, name, header, evr, arch, issrc, description, summary)209 values (?,?,rpmheader_in(decode(?, 'hex')::bytea),?,?,?,?,?)210 }211 );212 207 my $description = $header->queryformat('%{DESCRIPTION}'); 213 208 { … … 221 216 } 222 217 223 $ add_header->execute(224 $header->queryformat('%{PKGID}'),225 $header->queryformat('%{name}'),226 unpack('H*', $string),227 $header->queryformat('%|EPOCH?{%{EPOCH}:}:{}|%{VERSION}-%{RELEASE}'),228 $header->queryformat('%{ARCH}'),229 $header->hastag('SOURCERPM') ? 'f' : 't',230 $description,231 $summary,232 ) or return;233 my $index_tag = $self->db-> prepare_cached(218 $self->db->base->resultset('Rpms')->create({ 219 pkgid => $header->queryformat('%{PKGID}'), 220 name => $header->queryformat('%{name}'), 221 header => \sprintf(qq{rpmheader_in(decode('%s', 'hex')::bytea)}, unpack('H*', $string)), 222 evr => $header->queryformat('%|EPOCH?{%{EPOCH}:}:{}|%{VERSION}-%{RELEASE}'), 223 arch => $header->queryformat('%{ARCH}'), 224 issrc => $header->hastag('SOURCERPM') ? 'f' : 't', 225 description => $description, 226 summary => $summary, 227 }); 228 my $index_tag = $self->db->base->storage->dbh->prepare_cached( 234 229 q{ 235 230 select index_rpms(?); … … 238 233 $index_tag->execute($header->queryformat('%{PKGID}')) or return; 239 234 $index_tag->finish; 240 if (!$header->hastag('SOURCERPM')) { 241 Sophie::Base::Header->new($header->queryformat('%{PKGID}'), $self->{db}) 242 ->addfiles_content({ path => $self->path, filename => $rpm}) or return; 243 } 244 245 $header->queryformat('%{PKGID}'); 235 236 return($header->queryformat('%{PKGID}'), 1); 246 237 } 247 238
Note: See TracChangeset
for help on using the changeset viewer.