Ensembl/ensembl-hive

View on GitHub
modules/Bio/EnsEMBL/Hive/DBSQL/BaseAdaptor.pm

Summary

Maintainability
Test Coverage
=pod 

=head1 NAME

    Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor

=head1 DESCRIPTION

    The base class for all other Object- or NakedTable- adaptors.
    Performs the low-level SQL needed to retrieve and store data in tables.

=head1 EXTERNAL DEPENDENCIES

    DBI 1.6

=head1 LICENSE

    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
    Copyright [2016-2021] EMBL-European Bioinformatics Institute

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates

=cut


package Bio::EnsEMBL::Hive::DBSQL::BaseAdaptor;

use strict;
use warnings;
no strict 'refs';   # needed to allow AUTOLOAD create new methods
use DBI 1.6;        # the 1.6 functionality is important for detecting autoincrement fields and other magic.

use Bio::EnsEMBL::Hive::Utils ('stringify', 'throw');


sub default_table_name {
    throw("Please define table_name either by setting it via table_name() method or by redefining default_table_name() in your adaptor class");
}


sub default_insertion_method {
    return 'INSERT_IGNORE';
}


sub default_overflow_limit {
    return {
        # 'overflow_column1_name' => column1_size,
        # 'overflow_column2_name' => column2_size,
        # ...
    };
}

sub default_input_column_mapping {
    return {
        # 'original_column1' => "original_column1*10 AS c1_times_ten",
        # 'original_column2' => "original_column2+1 AS c2_plus_one",
        # ...
    };
}

sub do_not_update_columns {
    return [];
}

# ---------------------------------------------------------------------------

sub new {
    my $class   = shift @_;
    my $dbobj   = shift @_;

    my $self = bless {}, $class;

    if ( !defined $dbobj || !ref $dbobj ) {
        throw("Don't have a db [$dbobj] for new adaptor");
    }

    if ( ref($dbobj) =~ /DBConnection$/ ) {
        $self->dbc($dbobj);
    } elsif( UNIVERSAL::can($dbobj, 'dbc') ) {
        $self->dbc( $dbobj->dbc );
        $self->db( $dbobj );
    } else {
        throw("I was given [$dbobj] for a new adaptor");
    }

    my %options = @_;

    foreach my $option_name (keys %options) {
        if( UNIVERSAL::can( $self, $option_name ) ) {
            if(defined(my $option_value = delete $options{ $option_name })) {
                $self->$option_name( $option_value );
            }
        }
    }

    return $self;
}


sub db {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_db} = shift @_;
    }
    return $self->{_db};
}


sub dbc {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_dbc} = shift @_;
    }
    return $self->{_dbc};
}


sub prepare {
    my ( $self, $sql ) = @_;

    # Uncomment next line to cancel caching on the SQL side.
    # Needed for timing comparisons etc.
    #$sql =~ s/SELECT/SELECT SQL_NO_CACHE/i;

    return $self->dbc->prepare($sql);
}


sub overflow_limit {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_overflow_limit} = shift @_;
    }
    return $self->{_overflow_limit} || $self->default_overflow_limit();
}


sub input_column_mapping {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_input_column_mapping} = shift @_;
    }
    return $self->{_input_column_mapping} || $self->default_input_column_mapping();
}


sub table_name {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_table_name} = shift @_;
        $self->_table_info_loader();
    }
    return $self->{_table_name} || $self->default_table_name();
}


sub insertion_method {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_insertion_method} = shift @_;
    }
    return $self->{_insertion_method} || $self->default_insertion_method();
}


sub column_set {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_column_set} = shift @_;
    } elsif( !defined( $self->{_column_set} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_column_set};
}


sub primary_key {        # not necessarily auto-incrementing
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_primary_key} = shift @_;
    } elsif( !defined( $self->{_primary_key} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_primary_key};
}


sub updatable_column_list {    # it's just a cashed view, you cannot set it directly
    my $self = shift @_;

    unless($self->{_updatable_column_list}) {
        my %primary_key_set     = map { $_ => 1 } @{$self->primary_key};
        my %non_updatable_set   = map { $_ => 1 } @{$self->do_not_update_columns};
        my $column_set          = $self->column_set();
        $self->{_updatable_column_list} = [ grep { not ($primary_key_set{$_} || $non_updatable_set{$_}) } keys %$column_set ];
    }
    return $self->{_updatable_column_list};
}


sub autoinc_id {
    my $self = shift @_;

    if(@_) {    # setter
        $self->{_autoinc_id} = shift @_;
    } elsif( !defined( $self->{_autoinc_id} ) ) {
        $self->_table_info_loader();
    }
    return $self->{_autoinc_id};
}


sub _table_info_loader {
    my $self = shift @_;

    my $dbc         = $self->dbc();
    my $dbh         = $dbc->db_handle();
    my $driver      = $dbc->driver();
    my $dbname      = $dbc->dbname();
    my $table_name  = $self->table_name();

    my %column_set  = ();
    my $autoinc_id  = '';
    my @primary_key = $dbh->primary_key(undef, undef, $table_name);

    my $sth = $dbh->column_info(undef, undef, $table_name, '%');
    $sth->execute();
    while (my $row = $sth->fetchrow_hashref()) {
        my ( $column_name, $column_type ) = @$row{'COLUMN_NAME', 'TYPE_NAME'};

        # warn "ColumnInfo [$table_name/$column_name] = $column_type\n";

        $column_set{$column_name}  = $column_type;

        if( ($column_name eq $table_name.'_id')
         or ($table_name eq 'analysis_base' and $column_name eq 'analysis_id') ) {    # a special case (historical)
            $autoinc_id = $column_name;
        }
    }
    $sth->finish;

    $self->column_set(  \%column_set );
    $self->primary_key( \@primary_key );
    $self->autoinc_id(   $autoinc_id );
}


sub count_all {
    my ($self, $constraint, $key_list) = @_;

    my $table_name      = $self->table_name();
    my $driver          = $self->dbc->driver();
    my $count_col_name  = $driver eq 'pgsql' ? 'count' : 'COUNT(*)';

    my $sql = "SELECT ".($key_list ? join(', ', @$key_list, '') : '')."COUNT(*) FROM $table_name";

    if($constraint) {
            # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front:
        $sql .= (($constraint=~/\bJOIN\b/i) ? ' ' : ' WHERE ') . $constraint;
    }

    if($key_list) {
        $sql .= " GROUP BY ".join(', ', @$key_list);
    }
    # warn "SQL: $sql\n";

    my $sth = $self->prepare($sql);
    $sth->execute;

    my $result_struct;  # will be autovivified to the correct data structure

    while(my $hashref = $sth->fetchrow_hashref) {

        my $pptr = \$result_struct;
        if($key_list) {
            foreach my $syll (@$key_list) {
                $pptr = \$$pptr->{$hashref->{$syll}};   # using pointer-to-pointer to enforce same-level vivification
            }
        }
        $$pptr = $hashref->{$count_col_name};
    }

    unless(defined($result_struct)) {
        if($key_list and scalar(@$key_list)) {
            $result_struct = {};
        } else {
            $result_struct = 0;
        }
    }

    return $result_struct;
}


sub fetch_all {
    my ($self, $constraint, $one_per_key, $key_list, $value_column) = @_;
    
    my $table_name              = $self->table_name();
    my $input_column_mapping    = $self->input_column_mapping();

    my $sql = 'SELECT ' . join(', ', map { $input_column_mapping->{$_} // "$table_name.$_" } keys %{$self->column_set()}) . " FROM $table_name";

    if($constraint) { 
            # in case $constraint contains any kind of JOIN (regular, LEFT, RIGHT, etc) do not put WHERE in front:
        $sql .= (($constraint=~/\bJOIN\b/i or $constraint=~/^LIMIT|ORDER|GROUP/) ? ' ' : ' WHERE ') . $constraint;
    }

    # warn "SQL: $sql\n";

    my $sth = $self->prepare($sql);
    $sth->execute;  

    my @overflow_columns = keys %{ $self->overflow_limit() };
    my $overflow_adaptor = scalar(@overflow_columns) && $self->db->get_AnalysisDataAdaptor();

    my $result_struct;  # will be autovivified to the correct data structure

    while(my $hashref = $sth->fetchrow_hashref) {

        foreach my $overflow_key (@overflow_columns) {
            if($hashref->{$overflow_key} =~ /^_ext(?:\w+)_data_id (\d+)$/) {
                $hashref->{$overflow_key} = $overflow_adaptor->fetch_by_analysis_data_id_TO_data($1);
            }
        }

        my $pptr = \$result_struct;
        if($key_list) {
            foreach my $syll (@$key_list) {
                $pptr = \$$pptr->{$hashref->{$syll}};   # using pointer-to-pointer to enforce same-level vivification
            }
        }
        my $object = $value_column
            ? $hashref->{$value_column}
            : $self->objectify($hashref);

        if(UNIVERSAL::can($object, 'seconds_since_last_fetch')) {
            $object->seconds_since_last_fetch(0);
        }

        if($one_per_key) {
            $$pptr = $object;
        } else {
            push @$$pptr, $object;
        }
    }
    $sth->finish;  

    unless(defined($result_struct)) {
        if($key_list and scalar(@$key_list)) {
            $result_struct = {};
        } elsif(!$one_per_key) {
            $result_struct = [];
        }
    }

    return $result_struct;  # either listref or hashref is returned, depending on the call parameters
}


sub primary_key_constraint {
    my $self        = shift @_;
    my $sliceref    = shift @_;

    my $primary_key  = $self->primary_key();  # Attention: the order of primary_key columns of your call should match the order in the table definition!

    if(@$primary_key) {
        return join (' AND ', map { $primary_key->[$_]."='".$sliceref->[$_]."'" } (0..scalar(@$primary_key)-1));
    } else {
        my $table_name = $self->table_name();
        throw("Table '$table_name' doesn't have a primary_key");
    }
}


sub fetch_by_dbID {
    my $self = shift @_;    # the rest in @_ should be primary_key column values

    return $self->fetch_all( $self->primary_key_constraint( \@_ ), 1 );
}


sub remove_all {    # remove entries by a constraint
    my $self        = shift @_;
    my $constraint  = shift @_ || 1;

    my $table_name  = $self->table_name();

    my $sql = "DELETE FROM $table_name WHERE $constraint";
    my $sth = $self->prepare($sql);
    $sth->execute();
    $sth->finish();
}


sub remove {    # remove the object by primary_key
    my $self        = shift @_;
    my $object      = shift @_;

    # the object hasn't actually been stored yet / in this database
    return if(UNIVERSAL::can($object, 'adaptor') and (!$object->adaptor or $object->adaptor != $self));

    my $primary_key_constraint  = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) );

    return $self->remove_all( $primary_key_constraint );
}


sub update {    # update (some or all) non_primary columns from the primary
    my $self    = shift @_;
    my $object  = shift @_;    # the rest in @_ should be the column names to be updated

    my $table_name              = $self->table_name();
    my $primary_key_constraint  = $self->primary_key_constraint( $self->slicer($object, $self->primary_key()) );
    my $columns_to_update       = scalar(@_) ? \@_ : $self->updatable_column_list();
    my $values_to_update        = $self->slicer( $object, $columns_to_update );

    unless(@$columns_to_update) {
        throw("There are no dependent columns to update, as everything seems to belong to the primary key");
    }

    my $sql = "UPDATE $table_name SET ".join(', ', map { "$_=?" } @$columns_to_update)." WHERE $primary_key_constraint";
    # warn "SQL: $sql\n";
    my $sth = $self->prepare($sql);
    # warn "VALUES_TO_UPDATE: ".join(', ', map { "'$_'" } @$values_to_update)."\n";
    $sth->execute( @$values_to_update);

    $sth->finish();
}


sub store_or_update_one {
    my ($self, $object, $filter_columns) = @_;

    #use Data::Dumper;
    if(UNIVERSAL::can($object, 'adaptor') and $object->adaptor and $object->adaptor==$self) {  # looks like it has been previously stored
        if( @{ $self->primary_key() } and @{ $self->updatable_column_list() } ) {
            $self->update( $object );
            #warn "store_or_update_one: updated [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n";
        } else {
            #warn "store_or_update_one: non-updatable [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n";
        }
    } elsif( my $present = $self->check_object_present_in_db_by_content( $object, $filter_columns ) ) {
        $self->mark_stored($object, $present);
        #warn "store_or_update_one: found [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."] in db by content of (".join(', ', @$filter_columns).")\n";
        if( @{ $self->primary_key() } and @{ $self->updatable_column_list() } ) {
            #warn "store_or_update_one: updating the columns (".join(', ', @{ $self->updatable_column_list() }).")\n";
            $self->update( $object );
        }
    } else {
        $self->store( $object );
        #warn "store_or_update_one: stored [".(UNIVERSAL::can($object, 'toString') ? $object->toString : Dumper($object))."]\n";
    }
}


sub check_object_present_in_db_by_content {    # return autoinc_id/undef if the table has autoinc_id or just 1/undef if not
    my ( $self, $object, $filter_columns ) = @_;

    my $table_name  = $self->table_name();
    my $column_set  = $self->column_set();
    my $autoinc_id  = $self->autoinc_id();

    if($filter_columns) {
            # make sure all fields exist in the database as columns:
        $filter_columns = [ map { $column_set->{$_} ? $_ : $_.'_id' } @$filter_columns ];
    } else {
            # we look for identical contents, so must skip the autoinc_id columns when fetching:
        $filter_columns = [ grep { $_ ne $autoinc_id } keys %$column_set ];
    }
    my %filter_hash;
    @filter_hash{ @$filter_columns } = @{ $self->slicer( $object, $filter_columns ) };

    my @constraints = ();
    my @values = ();
    while(my ($column, $value) = each %filter_hash ) {
        if( defined($value) ) {
            push @constraints, "$column = ?";
            push @values, $value;
        } else {
            push @constraints, "$column IS NULL";
        }
    }

    my $sql = 'SELECT '.($autoinc_id or 1)." FROM $table_name WHERE ".  join(' AND ', @constraints);
    my $sth = $self->prepare( $sql );
    $sth->execute( @values );

    my ($return_value) = $sth->fetchrow_array();
#warn "check_object_present_in_db_by_content: sql= $sql WITH VALUES (".join(', ', @values).") ---> return_value=".($return_value//'undef')."\n";
    $sth->finish;

    return $return_value;
}


sub store {
    my ($self, $object_or_list) = @_;

    my $objects = (ref($object_or_list) eq 'ARRAY')     # ensure we get an array of objects to store
        ? $object_or_list
        : [ $object_or_list ];
    return ([], 0) unless(scalar(@$objects));

    my $table_name              = $self->table_name();
    my $autoinc_id              = $self->autoinc_id();
    my $all_storable_columns    = [ grep { $_ ne $autoinc_id } keys %{ $self->column_set() } ];
    my $driver                  = $self->dbc->driver();
    my $insertion_method        = $self->insertion_method;  # INSERT, INSERT_IGNORE or REPLACE
    $insertion_method           =~ s/_/ /g;
    if($driver eq 'sqlite') {
        $insertion_method =~ s/INSERT IGNORE/INSERT OR IGNORE/ig;
    } elsif($driver eq 'pgsql') {   # FIXME! temporary hack
        $insertion_method = 'INSERT';
    }

    my %hashed_sth = ();  # do not prepare statements until there is a real need

    my $stored_this_time        = 0;

    foreach my $object (@$objects) {
            my ($columns_being_stored, $column_key) = $self->keys_to_columns($object);
            # warn "COLUMN_KEY='$column_key'\n";

            my $this_sth;

                # only prepare (once!) if we get here:
            unless($this_sth = $hashed_sth{$column_key}) {
                    # By using question marks we can insert true NULLs by setting corresponding values to undefs:
                my $sql = "$insertion_method INTO $table_name (".join(', ', @$columns_being_stored).') VALUES ('.join(',', (('?') x scalar(@$columns_being_stored))).')';
                # warn "STORE: $sql\n";
                $this_sth = $hashed_sth{$column_key} = $self->prepare( $sql ) or throw("Could not prepare statement: $sql");
            }

            # warn "STORED_COLUMNS: ".stringify($columns_being_stored)."\n";
            my $values_being_stored = $self->slicer( $object, $columns_being_stored );
            # warn "STORED_VALUES: ".stringify($values_being_stored)."\n";

            my $return_code = $this_sth->execute( @$values_being_stored )
                    # using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
                or throw("Could not store fields\n\t{$column_key}\nwith data:\n\t(".join(',', @$values_being_stored).')');
            if($return_code > 0) {     # <--- for the same reason we have to be explicitly numeric here
                my $liid = $autoinc_id && $self->dbc->db_handle->last_insert_id(undef, undef, $table_name, $autoinc_id);
                $self->mark_stored($object, $liid );
                ++$stored_this_time;
            }
    }

    foreach my $sth (values %hashed_sth) {
        $sth->finish();
    }

    return ($object_or_list, $stored_this_time);
}


sub DESTROY { }   # to simplify AUTOLOAD

sub AUTOLOAD {
    our $AUTOLOAD;

    if($AUTOLOAD =~ /::fetch(_all)?(?:_by_(\w+?))?(?:_HASHED_FROM_(\w+?))?(?:_TO_(\w+?))?$/) {
        my $all             = $1;
        my $filter_string   = $2;
        my $key_string      = $3;
        my $value_column    = $4;

        my ($self) = @_;
        my $column_set = $self->column_set();

            # NB: this filtering happens BEFORE any possible overflow via analysis_data, so will not be done on overflow_columns
        my $filter_components = $filter_string && [ split(/_AND_/i, $filter_string) ];
        if($filter_components) {
            foreach my $column_name ( @$filter_components ) {
                unless($column_set->{$column_name}) {
                    throw("unknown column '$column_name'");
                }
            }
        }

        my $key_components = $key_string && [ split(/_AND_/i, $key_string) ];
        if($key_components) {
            foreach my $column_name ( @$key_components ) {
                unless($column_set->{$column_name}) {
                    throw("unknown column '$column_name'");
                }
            }
        }

        if($value_column && !$column_set->{$value_column}) {
            throw("unknown column '$value_column'");
        }

#        warn "Setting up '$AUTOLOAD' method\n";
        *$AUTOLOAD = sub {
            my $self = shift @_;
            return $self->fetch_all(
                $filter_components && join(' AND ', map { "$filter_components->[$_]='$_[$_]'" } 0..scalar(@$filter_components)-1),
                !$all,
                $key_components,
                $value_column
            );
        };
        goto &$AUTOLOAD;    # restart the new method

    } elsif($AUTOLOAD =~ /::count_all(?:_by_(\w+?))?(?:_HASHED_FROM_(\w+?))?$/) {
        my $filter_string   = $1;
        my $key_string      = $2;

        my ($self) = @_;
        my $column_set = $self->column_set();

        my $filter_components = $filter_string && [ split(/_AND_/i, $filter_string) ];
        if($filter_components) {
            foreach my $column_name ( @$filter_components ) {
                unless($column_set->{$column_name}) {
                    throw("unknown column '$column_name'");
                }
            }
        }

        my $key_components = $key_string && [ split(/_AND_/i, $key_string) ];
        if($key_components) {
            foreach my $column_name ( @$key_components ) {
                unless($column_set->{$column_name}) {
                    throw("unknown column '$column_name'");
                }
            }
        }

#        warn "Setting up '$AUTOLOAD' method\n";
        *$AUTOLOAD = sub {
            my $self = shift @_;
            return $self->count_all(
                $filter_components && join(' AND ', map { "$filter_components->[$_]='$_[$_]'" } 0..scalar(@$filter_components)-1),
                $key_components,
            );
        };
        goto &$AUTOLOAD;    # restart the new method

    } elsif($AUTOLOAD =~ /::remove_all_by_(\w+)$/) {
        my $filter_name = $1;

        my ($self) = @_;
        my $column_set = $self->column_set();

        if($column_set->{$filter_name}) {
#            warn "Setting up '$AUTOLOAD' method\n";
            *$AUTOLOAD = sub { my ($self, $filter_value) = @_; return $self->remove_all("$filter_name='$filter_value'"); };
            goto &$AUTOLOAD;    # restart the new method
        } else {
            throw("unknown column '$filter_name'");
        }
    } elsif($AUTOLOAD =~ /::update_(\w+)$/) {
        my @columns_to_update = split(/_AND_/i, $1);
#        warn "Setting up '$AUTOLOAD' method\n";
        *$AUTOLOAD = sub { my ($self, $object) = @_; return $self->update($object, @columns_to_update); };
        goto &$AUTOLOAD;    # restart the new method
    } else {
        warn "sub '$AUTOLOAD' not implemented";
    }
}

1;