Ensembl/ensembl-hive

View on GitHub
modules/Bio/EnsEMBL/Hive/AnalysisJob.pm

Summary

Maintainability
Test Coverage
=pod 

=head1 NAME

    Bio::EnsEMBL::Hive::AnalysisJob

=head1 DESCRIPTION

    An AnalysisJob is the link between the input_id control data, the analysis and
    the rule system.  It also tracks the state of the job as it is processed

=head1 LICENSE

    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
    Copyright [2016-2021] EMBL-European Bioinformatics Institute

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates

=head1 APPENDIX

    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _

=cut


package Bio::EnsEMBL::Hive::AnalysisJob;

use strict;
use warnings;

use Bio::EnsEMBL::Hive::Utils ('stringify', 'destringify');
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;

use base (  'Bio::EnsEMBL::Hive::Cacheable',# mainly to inherit hive_pipeline() method
            'Bio::EnsEMBL::Hive::Storable', # inherit dbID(), adaptor() and new() methods
            'Bio::EnsEMBL::Hive::Params',   # inherit param management functionality
         );


=head1 AUTOLOADED

    prev_job_id / prev_job

    analysis_id / analysis

=cut


sub input_id {
    my $self = shift;
    if(@_) {
        my $input_id = shift @_;
        $self->{'_input_id'} = ref($input_id) ? stringify($input_id) : $input_id;
    }

    return $self->{'_input_id'};
}

sub param_id_stack {
    my $self = shift;
    $self->{'_param_id_stack'} = shift if(@_);
    $self->{'_param_id_stack'} = '' unless(defined($self->{'_param_id_stack'}));
    return $self->{'_param_id_stack'};
}

sub accu_id_stack {
    my $self = shift;
    $self->{'_accu_id_stack'} = shift if(@_);
    $self->{'_accu_id_stack'} = '' unless(defined($self->{'_accu_id_stack'}));
    return $self->{'_accu_id_stack'};
}

sub role_id {
    my $self = shift;
    $self->{'_role_id'} = shift if(@_);
    return $self->{'_role_id'};
}

sub status {
    my $self = shift;
    $self->{'_status'} = shift if(@_);
    $self->{'_status'} = ( ($self->semaphore_count>0) ? 'SEMAPHORED' : 'READY' ) unless(defined($self->{'_status'}));
    return $self->{'_status'};
}

sub retry_count {
    my $self = shift;
    $self->{'_retry_count'} = shift if(@_);
    $self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'}));
    return $self->{'_retry_count'};
}

sub when_completed {
    my $self = shift;
    $self->{'_when_completed'} = shift if(@_);
    return $self->{'_when_completed'};
}

sub runtime_msec {
    my $self = shift;
    $self->{'_runtime_msec'} = shift if(@_);
    $self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'}));
    return $self->{'_runtime_msec'};
}

sub query_count {
    my $self = shift;
    $self->{'_query_count'} = shift if(@_);
    $self->{'_query_count'} = 0 unless(defined($self->{'_query_count'}));
    return $self->{'_query_count'};
}

sub semaphore_count {
    my $self = shift;
    $self->{'_semaphore_count'} = shift if(@_);
    $self->{'_semaphore_count'} = 0 unless(defined($self->{'_semaphore_count'}));
    return $self->{'_semaphore_count'};
}

sub semaphored_job_id {
    my $self = shift;
    $self->{'_semaphored_job_id'} = shift if(@_);
    return $self->{'_semaphored_job_id'};
}

sub set_and_update_status {
    my ($self, $status ) = @_;

    $self->status($status);

    if(my $adaptor = $self->adaptor) {
        $adaptor->check_in_job($self);
    }
}

sub stdout_file {
  my $self = shift;
  $self->{'_stdout_file'} = shift if(@_);
  return $self->{'_stdout_file'};
}

sub stderr_file {
  my $self = shift;
  $self->{'_stderr_file'} = shift if(@_);
  return $self->{'_stderr_file'};
}

sub accu_hash {
    my $self = shift;
    $self->{'_accu_hash'} = shift if(@_);
    $self->{'_accu_hash'} = {} unless(defined($self->{'_accu_hash'}));
    return $self->{'_accu_hash'};
}


=head2 autoflow

    Title   :  autoflow
    Function:  Gets/sets flag for whether the job should
               be automatically dataflowed on branch 1 when the job completes.
               If the subclass manually sends a job along branch 1 with dataflow_output_id,
               the autoflow will turn itself off.
    Returns :  boolean (1=default|0)

=cut

sub autoflow {
  my $self = shift;

  $self->{'_autoflow'} = shift if(@_);
  $self->{'_autoflow'} = 1 unless(defined($self->{'_autoflow'}));  

  return $self->{'_autoflow'};
}


##-----------------[indicators to the Worker]--------------------------------


sub lethal_for_worker {     # Job should set this to 1 prior to dying (or before running code that might cause death - such as RunnableDB's compilation)
                            # if it believes that the state of things will not allow the Worker to continue normally.
                            # The Worker will check the flag and commit suicide if it is set to true.
    my $self = shift;
    $self->{'_lethal_for_worker'} = shift if(@_);
    return $self->{'_lethal_for_worker'};
}

sub transient_error {       # Job should set this to 1 prior to dying (or before running code that might cause death)
                            # if it believes that it makes sense to retry the same job without any changes.
                            # It may also set it to 0 prior to dying (or before running code that might cause death)
                            # if it believes that there is no point in re-trying (say, if the parameters are wrong).
                            # The Worker will check the flag and make necessary adjustments to the database state.
    my $self = shift;
    $self->{'_transient_error'} = shift if(@_);
    return $self->{'_transient_error'};
}

sub incomplete {            # Job should set this to 0 prior to throwing if the job is done,
                            # but it wants the thrown message to be recorded with is_error=0.
    my $self = shift;
    $self->{'_incomplete'} = shift if(@_);
    return $self->{'_incomplete'};
}


sub died_somewhere {
    my $self = shift;

    $self->{'_died_somewhere'} ||= shift if(@_);    # NB: the '||=' only applies in this case - do not copy around!
    return $self->{'_died_somewhere'} ||=0;
}

##-----------------[/indicators to the Worker]-------------------------------


sub load_parameters {
    my ($self, $runnable_object) = @_;

    my @params_precedence = ();

    push @params_precedence, $runnable_object->param_defaults if($runnable_object);

    push @params_precedence, $self->hive_pipeline->params_as_hash;

    if(my $job_adaptor = $self->adaptor) {
        my $job_id          = $self->dbID;
        my $accu_adaptor    = $job_adaptor->db->get_AccumulatorAdaptor;

        $self->accu_hash( $accu_adaptor->fetch_structures_for_job_ids( $job_id )->{ $job_id } );

        push @params_precedence, $self->analysis->parameters if($self->analysis);

        if($self->param_id_stack or $self->accu_id_stack) {
            my $input_ids_hash      = $job_adaptor->fetch_input_ids_for_job_ids( $self->param_id_stack, 2, 0 );     # input_ids have lower precedence (FOR EACH ID)
            my $accu_hash           = $accu_adaptor->fetch_structures_for_job_ids( $self->accu_id_stack, 2, 1 );     # accus have higher precedence (FOR EACH ID)
            my %input_id_accu_hash  = ( %$input_ids_hash, %$accu_hash );
            push @params_precedence, @input_id_accu_hash{ sort { $a <=> $b } keys %input_id_accu_hash }; # take a slice. Mmm...
        }
    }

    push @params_precedence, $self->input_id, $self->accu_hash;

    $self->param_init( @params_precedence );
}


sub fan_cache {     # a self-initializing getter (no setting)
                    # Returns a hash-of-lists { 2 => [list of jobs waiting to be funneled into 2], 3 => [list of jobs waiting to be funneled into 3], etc}
    my $self = shift;

    return $self->{'_fan_cache'} ||= {};
}

=head2 dataflow_output_id

    Title        :  dataflow_output_id
    Arg[1](req)  :  <string> $output_id 
    Arg[2](opt)  :  <int> $branch_name_or_code (optional, defaults to 1)
    Usage        :  $self->dataflow_output_id($output_id, $branch_name_or_code);
    Function:  
      If a RunnableDB(Process) needs to create jobs, this allows it to have jobs 
      created and flowed through the dataflow rules of the workflow graph.
      This 'output_id' becomes the 'input_id' of the newly created job at
      the ends of the dataflow pipes.  The optional 'branch_name_or_code' determines
      which dataflow pipe(s) to flow the job through.      

=cut

sub dataflow_output_id {
    my ($self, $output_ids, $branch_name_or_code) = @_;

    my $input_id                = $self->input_id();
    my $hive_use_param_stack    = $self->hive_pipeline->hive_use_param_stack;

    $output_ids    =  destringify($output_ids) unless ref($output_ids);     # destringify the string
    $output_ids    = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY');   # force previously used single values into an arrayref

    my @destringified_output_ids;
    foreach my $output_id (@$output_ids) {
        $output_id = destringify($output_id) unless ref($output_id);        # destringify the string
        if ((defined $output_id) and (ref($output_id) ne 'HASH')) {         # Only undefs and hashrefs work as input_ids
            die stringify($output_id)." is not a hashref ! Cannot dataflow";
        }
        push @destringified_output_ids, $output_id;
    }

        # map branch names to numbers:
    my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);

        # if branch_code is set to 1 (explicitly or implicitly), turn off automatic dataflow:
    $self->autoflow(0) if($branch_code == 1);

    my @output_job_ids = ();

        # fan rules come sorted before funnel rules for the same branch_code:
    foreach my $df_rule ( @{ $self->analysis->dataflow_rules_by_branch->{$branch_code} || [] } ) {

        my $targets_grouped_by_condition    = $df_rule->get_my_targets_grouped_by_condition;    # the pairs are deliberately ordered to put the DEFAULT branch last
        my @conditions                      = map { $_->[0] } @$targets_grouped_by_condition;

        my $total_output_ids_for_the_rule = 0;

        foreach my $output_id (@destringified_output_ids) {  # filter the output_ids and place them into the [2] part of $targets_grouped_by_condition
            my $condition_match_count = 0;
            foreach my $condition_idx (0..@conditions-1) {
                my $unsubstituted_condition = $conditions[$condition_idx];

                if(defined($unsubstituted_condition)) {
                    if(my $substituted_condition = $self->param_substitute('#expr('.$unsubstituted_condition.')expr#', $output_id)) {
                        $condition_match_count++;
                    } else {
                        next;   # non-DEFAULT condition branch failed
                    }
                } elsif($condition_match_count) {
                    next;   # DEFAULT condition branch failed, because one of the conditions fired
                } else {
                    # DEFAULT condition branch succeeded => follow to the push
                }

                push @{$targets_grouped_by_condition->[$condition_idx][2]}, $output_id;
                $total_output_ids_for_the_rule += scalar( @{ $targets_grouped_by_condition->[$condition_idx][1] } );
            }
        }

        my $fan_cache_for_this_rule = exists($self->fan_cache->{"$df_rule"}) && $self->fan_cache->{"$df_rule"};
        if($fan_cache_for_this_rule && @$fan_cache_for_this_rule && $total_output_ids_for_the_rule!=1) {
            die "The total number of funnel output_ids (considering ".scalar(@conditions)." conditions) was $total_output_ids_for_the_rule, but expected to be 1. Please investigate";
        }

        foreach my $triple (@$targets_grouped_by_condition) {
            my ($unsubstituted_condition, $df_targets, $filtered_output_ids) = @$triple;

            if($filtered_output_ids && @$filtered_output_ids) {

                foreach my $df_target (@$df_targets) {

                    my $extend_param_stack = $hive_use_param_stack || $df_target->extend_param_stack;   # this boolean is target-specific

                        # by default replicate the parameters of the parent in the child (undef becomes {}+stack or $input_id, depending on INPUT_PLUS)
                    my @pre_substituted_output_ids = map { $_ // ($extend_param_stack ? {} : $input_id) } @$filtered_output_ids;

                        # parameter substitution into input_id_template is rule-specific
                    my $output_ids_for_this_rule;
                    if(my $template_string = $df_target->input_id_template()) {
                        my $template_hash = destringify($template_string);
                        $output_ids_for_this_rule = [ map { $self->param_substitute($template_hash, $_) } @pre_substituted_output_ids ];
                    } else {
                        $output_ids_for_this_rule = \@pre_substituted_output_ids;
                    }

                    my ($stored_listref) = $df_target->to_analysis->dataflow( $output_ids_for_this_rule, $self, $extend_param_stack, $df_rule );

                    push @output_job_ids, @$stored_listref;

                } # /foreach my $df_target
            } # /if(filtered_output_ids are workable)
        } # /foreach my $unsubstituted_condition
    } # /foreach my $df_rule

    return \@output_job_ids;
}


sub toString {
    my $self = shift @_;

    my $analysis_label = $self->analysis
        ? ( $self->analysis->logic_name.'('.$self->analysis_id.')' )
        : '(NULL)';

    return 'Job dbID='.($self->dbID || '(NULL)')." analysis=$analysis_label, input_id='".$self->input_id."', status=".$self->status.", retry_count=".$self->retry_count;
}


1;