Ensembl/ensembl-hive

View on GitHub
modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm

Summary

Maintainability
Test Coverage
=pod

=head1 NAME

    Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor

=head1 SYNOPSIS

    $analysisJobAdaptor = $db_adaptor->get_AnalysisJobAdaptor;
    $analysisJobAdaptor = $analysisJob->adaptor;

=head1 DESCRIPTION

    Module to encapsulate all db access for persistent class AnalysisJob.
    There should be just one per application and database connection.

=head1 LICENSE

    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
    Copyright [2016-2021] EMBL-European Bioinformatics Institute

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates

=head1 APPENDIX

    The rest of the documentation details each of the object methods.
    Internal methods are preceded with a _

=cut


package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;

use strict;
use warnings;

use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
use Bio::EnsEMBL::Hive::Utils ('stringify');

use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');


# This variable must be kept up-to-date ! It is used in a number of queries.
# CLAIMED is missing on purpose because not all the queries actually need it.
my $ALL_STATUSES_OF_RUNNING_JOBS = q{'PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_HEALTHCHECK','POST_CLEANUP'};


sub default_table_name {
    return 'job';
}


sub object_class {
    return 'Bio::EnsEMBL::Hive::AnalysisJob';
}


sub default_overflow_limit {
    return {
        'input_id'          => 255,
        'param_id_stack'    =>  64,
        'accu_id_stack'     =>  64,
    };
}


=head2 fetch_by_analysis_id_and_input_id

  Arg [1]    : Integer $analysis_id
  Arg [2]    : String $input_id
  Example    : $funnel_job = $job_adaptor->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id);
  Description: Attempts to find the job by contents, then makes another attempt if the input_id is expected to have overflown into analysis_data
  Returntype : AnalysisJob object

=cut

sub fetch_by_analysis_id_and_input_id {     # It is a special case not covered by AUTOLOAD; note the lowercase _and_
    my ($self, $analysis_id, $input_id) = @_;

    my $job = $self->fetch_by_analysis_id_AND_input_id( $analysis_id, $input_id);

    if(!$job and length($input_id)>$self->default_overflow_limit->{input_id}) {
        if(my $ext_data_id = $self->db->get_AnalysisDataAdaptor->fetch_by_data_to_analysis_data_id( $input_id )) {
            $job = $self->fetch_by_analysis_id_AND_input_id( $analysis_id, "_extended_data_id $ext_data_id");
        }
    }
    return $job;
}


=head2 store_jobs_and_adjust_counters

  Arg [1]    : arrayref of Bio::EnsEMBL::Hive::AnalysisJob $jobs_to_store
  Arg [2]    : (optional) boolean $push_new_semaphore
  Example    : my @output_job_ids = @{ $job_adaptor->store_jobs_and_adjust_counters( \@jobs_to_store ) };
  Description: Attempts to store a list of jobs, returns an arrayref of successfully stored job_ids
  Returntype : Reference to list of job_dbIDs

=cut

sub store_jobs_and_adjust_counters {
    my ($self, $jobs, $push_new_semaphore) = @_;

        # NB: our use patterns assume all jobs from the same storing batch share the same semaphored_job_id:
    my $semaphored_job_id                   = scalar(@$jobs) && $jobs->[0]->semaphored_job_id();
    my $need_to_increase_semaphore_count    = ($semaphored_job_id && !$push_new_semaphore);

    my @output_job_ids              = ();
    my $failed_to_store_local_jobs  = 0;

    foreach my $job (@$jobs) {

        my $analysis    = $job->analysis;
        my $job_adaptor = $analysis ? $analysis->adaptor->db->get_AnalysisJobAdaptor : $self;   # if analysis object is undefined, consider the job local
        my $prev_adaptor= ($job->prev_job && $job->prev_job->adaptor) || '';
        my $local_job   = $prev_adaptor eq $job_adaptor;

            # avoid deadlocks when dataflowing under transactional mode (used in Ortheus Runnable for example):
        if($need_to_increase_semaphore_count and $local_job and ($job_adaptor->dbc->driver ne 'sqlite')) {
            $job_adaptor->dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" );
        }

        $job->prev_job( undef ) unless( $local_job );   # break the link with the previous job if dataflowing across databases (current schema doesn't support URLs for job_ids)

        my ($job, $stored_this_time) = $job_adaptor->store( $job );

        if($stored_this_time) {
            if($need_to_increase_semaphore_count and $local_job) {  # if we are not creating a new semaphore (where dependent jobs have already been counted),
                                                                    # but rather propagating an existing one (same or other level), we have to up-adjust the counter
                $self->increase_semaphore_count_for_jobid( $semaphored_job_id );
            }

            unless($job_adaptor->db->hive_pipeline->hive_use_triggers()) {
                $job_adaptor->dbc->do(qq{
                        UPDATE analysis_stats
                        SET total_job_count=total_job_count+1
                    }
                    .(($job->status eq 'READY')
                        ? " ,ready_job_count=ready_job_count+1 "
                        : " ,semaphored_job_count=semaphored_job_count+1 "
                    ).(($job_adaptor->dbc->driver eq 'pgsql')
                        ? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
                        : " ,status =      CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
                    )." WHERE analysis_id=".$job->analysis_id
                );
            }

            push @output_job_ids, $job->dbID();

        } elsif( $local_job ) {
            $self->db->get_LogMessageAdaptor->store_hive_message( "JobAdaptor failed to store the local Job( analysis_id=".$job->analysis_id.', '.$job->input_id." ), possibly due to a collision", 0 );

            $failed_to_store_local_jobs++;
        }
    }

        # adjust semaphore_count for jobs that failed to be stored (but have been pre-counted during funnel's creation):
    if($push_new_semaphore and $failed_to_store_local_jobs) {
        $self->decrease_semaphore_count_for_jobid( $semaphored_job_id, $failed_to_store_local_jobs );
    }

    return \@output_job_ids;
}


=head2 fetch_all_by_analysis_id_status

  Arg [1]    : (optional) listref $list_of_analyses
  Arg [2]    : (optional) string $status
  Arg [3]    : (optional) int $retry_at_least
  Example    : $all_failed_jobs = $adaptor->fetch_all_by_analysis_id_status(undef, 'FAILED');
               $analysis_done_jobs = $adaptor->fetch_all_by_analysis_id_status( $list_of_analyses, 'DONE');
  Description: Returns a list of all jobs filtered by given analysis_id (if specified) and given status (if specified).
  Returntype : reference to list of Bio::EnsEMBL::Hive::AnalysisJob objects

=cut

sub fetch_all_by_analysis_id_status {
    my ($self, $list_of_analyses, $status, $retry_count_at_least) = @_;

    my @constraints = ();

    if($list_of_analyses) {
        if(ref($list_of_analyses) eq 'ARRAY') {
            push @constraints, "analysis_id IN (".(join(',', map {$_->dbID} @$list_of_analyses)).")";
        } else {
            push @constraints, "analysis_id=$list_of_analyses"; # for compatibility with old interface
        }
    }

    push @constraints, "status='$status'"                     if ($status);
    push @constraints, "retry_count >= $retry_count_at_least" if ($retry_count_at_least);

    return $self->fetch_all( join(" AND ", @constraints) );
}


sub fetch_some_by_analysis_id_limit {
    my ($self, $analysis_id, $limit) = @_;

    return $self->fetch_all( "analysis_id = '$analysis_id' LIMIT $limit" );
}


sub fetch_all_incomplete_jobs_by_role_id {
    my ($self, $role_id) = @_;

    my $constraint = "status IN ('CLAIMED',$ALL_STATUSES_OF_RUNNING_JOBS) AND role_id='$role_id'";
    return $self->fetch_all($constraint);
}


sub fetch_all_unfinished_jobs_with_no_roles {
    my $self = shift;

        # the list should contain all status'es that are not "in progress":
    return $self->fetch_all( "role_id IS NULL AND status NOT IN ('DONE', 'READY', 'FAILED', 'PASSED_ON', 'SEMAPHORED')" );
}


sub fetch_by_url_query {
    my ($self, $field_name, $field_value) = @_;

    if($field_name eq 'dbID' and $field_value) {

        return $self->fetch_by_dbID($field_value);

    } else {

        return;

    }
}


sub fetch_job_counts_hashed_by_status {
    my ($self, $requested_analysis_id) = @_;

    my %job_counts = ();

        # Note: this seemingly useless dummy_analysis_id is here to force MySQL use existing index on (analysis_id, status)
    my $sql = "SELECT analysis_id, status, count(*) FROM job WHERE analysis_id=? GROUP BY analysis_id, status";
    my $sth = $self->prepare($sql);
    $sth->execute( $requested_analysis_id );

    while (my ($dummy_analysis_id, $status, $job_count)=$sth->fetchrow_array()) {
        $job_counts{ $status } = $job_count;
    }

    $sth->finish;

    return \%job_counts;
}


########################
#
# STORE / UPDATE METHODS
#
########################


sub decrease_semaphore_count_for_jobid {    # used in semaphore annihilation or unsuccessful creation
    my $self  = shift @_;
    my $jobid = shift @_ or return;
    my $dec   = shift @_ || 1;

        # NB: BOTH THE ORDER OF UPDATES AND EXACT WORDING IS ESSENTIAL FOR SYNCHRONOUS ATOMIC OPERATION,
        #       otherwise the same command tends to behave differently on MySQL and SQLite (at least)
        #
    my $sql = "UPDATE job "
        .( ($self->dbc->driver eq 'pgsql')
            ? "SET status = CAST(CASE WHEN semaphore_count>$dec THEN 'SEMAPHORED' ELSE 'READY' END AS job_status), "
            : "SET status =      CASE WHEN semaphore_count>$dec THEN 'SEMAPHORED' ELSE 'READY' END, "
        ).qq{
            semaphore_count=semaphore_count-?
        WHERE job_id=? AND status='SEMAPHORED'
    };
    
    $self->dbc->protected_prepare_execute( [ $sql, $dec, $jobid ],
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'decreasing semaphore_count'.$after, 0 ); }
    );
}

sub increase_semaphore_count_for_jobid {    # used in semaphore propagation
    my $self  = shift @_;
    my $jobid = shift @_ or return;
    my $inc   = shift @_ || 1;

    my $sql = qq{
        UPDATE job
        SET semaphore_count=semaphore_count+?
        WHERE job_id=?
    };
    
    $self->dbc->protected_prepare_execute( [ $sql, $inc, $jobid ],
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'increasing semaphore_count'.$after, 0 ); }
    );
}


=head2 check_in_job

  Arg [1]    : $analysis_id
  Example    :
  Description: updates the job.status in the database
  Returntype : 
  Exceptions :
  Caller     : general

=cut

sub check_in_job {
    my ($self, $job) = @_;

    my $job_id = $job->dbID;

    my $sql = "UPDATE job SET status='".$job->status."' ";

    if($job->status eq 'DONE') {
        $sql .= ",when_completed=CURRENT_TIMESTAMP";
        $sql .= ",runtime_msec=".$job->runtime_msec;
        $sql .= ",query_count=".$job->query_count;
    } elsif($job->status eq 'PASSED_ON') {
        $sql .= ", when_completed=CURRENT_TIMESTAMP";
    } elsif($job->status eq 'READY') {
    }

    $sql .= " WHERE job_id='$job_id' ";

        # This particular query is infamous for collisions and 'deadlock' situations; let's wait and retry:
    $self->dbc->protected_prepare_execute( [ $sql ],
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_job_message( $job_id, "checking the job in".$after, 0 ); }
    );
}


=head2 store_out_files

  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisJob $job
  Example    :
  Description: update locations of log files, if present
  Returntype : 
  Exceptions :
  Caller     : Bio::EnsEMBL::Hive::Worker

=cut

sub store_out_files {
    my ($self, $job) = @_;

    # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.

    my $delete_sql  = 'DELETE from job_file WHERE job_id=' . $job->dbID . ' AND retry='.$job->retry_count;
    $self->dbc->do( $delete_sql );

    if($job->stdout_file or $job->stderr_file) {
        my $insert_sql = 'INSERT INTO job_file (job_id, retry, role_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
        my $insert_sth = $self->dbc->prepare($insert_sql);
        $insert_sth->execute( $job->dbID, $job->retry_count, $job->role_id, $job->stdout_file, $job->stderr_file );
        $insert_sth->finish();
    }
}


=head2 reset_or_grab_job_by_dbID

  Arg [1]    : int $job_id
  Arg [2]    : int $role_id (optional)
  Description: resets a job to to 'READY' (if no $role_id given) or directly to 'CLAIMED' so it can be run again, and fetches it.
               NB: Will also reset a previously 'SEMAPHORED' job to READY.
               The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
               but will not change retry_count if a job has never *really* started.
  Returntype : Bio::EnsEMBL::Hive::AnalysisJob or undef

=cut

sub reset_or_grab_job_by_dbID {
    my ($self, $job_id, $role_id) = @_;

    my $new_status  = $role_id ? 'CLAIMED' : 'READY';

        # Note: the order of the fields being updated is critical!
    my $sql = qq{
        UPDATE job
           SET retry_count = CASE WHEN (status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END
             , status=?
             , role_id=?
         WHERE job_id=?
    };
    my @values = ($new_status, $role_id, $job_id);

    my $sth = $self->prepare( $sql );
    my $return_code = $sth->execute( @values )
        or die "Could not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
    $sth->finish;

    my $job = $self->fetch_by_job_id_AND_status($job_id, $new_status) ;

    return $job;
}


=head2 grab_jobs_for_role

  Arg [1]           : Bio::EnsEMBL::Hive::Role object $role
  Arg [2]           : int $how_many_this_role
  Example: 
    my $jobs  = $job_adaptor->grab_jobs_for_role( $role, $how_many );
  Description: 
    For the specified Role, it will search available jobs, 
    and using the how_many_this_batch parameter, claim/fetch that
    number of jobs, and then return them.
  Returntype : 
    reference to array of Bio::EnsEMBL::Hive::AnalysisJob objects
  Caller     : Bio::EnsEMBL::Hive::Worker::run

=cut

sub grab_jobs_for_role {
    my ($self, $role, $how_many_this_batch) = @_;

    return [] unless( $how_many_this_batch );
  
    my $analysis_id     = $role->analysis_id;
    my $role_id         = $role->dbID;
    my $role_rank       = $self->db->get_RoleAdaptor->get_role_rank( $role );
    my $offset          = $how_many_this_batch * $role_rank;

    my $prefix_sql = ($self->dbc->driver eq 'mysql') ? qq{
         UPDATE job j
           JOIN (
                            SELECT job_id
                              FROM job
                             WHERE analysis_id='$analysis_id'
                               AND status='READY'
    } : qq{
         UPDATE job
           SET role_id='$role_id', status='CLAIMED'
         WHERE job_id in (
                            SELECT job_id
                              FROM job
                             WHERE analysis_id='$analysis_id'
                               AND status='READY'
    };
    my $virgin_sql = qq{       AND retry_count=0 };
    my $limit_sql  = qq{     LIMIT $how_many_this_batch };
    my $offset_sql = qq{    OFFSET $offset };
    my $suffix_sql = ($self->dbc->driver eq 'mysql') ? qq{
                 ) as x
         USING (job_id)
           SET j.role_id='$role_id', j.status='CLAIMED'
         WHERE j.status='READY'
    } : qq{
                 )
           AND status='READY'
    };

    my $claim_count;

        # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
    if(  0 == ($claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql . $virgin_sql . $limit_sql . $offset_sql . $suffix_sql ],
                    sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a virgin batch of offset jobs".$after, 0 ); }
    ))) {
        if( 0 == ($claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql .               $limit_sql . $offset_sql . $suffix_sql ],
                        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a non-virgin batch of offset jobs".$after, 0 ); }
        ))) {
             $claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql .               $limit_sql .               $suffix_sql ],
                            sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a non-virgin batch of non-offset jobs".$after, 0 ); }
             );
        }
    }

    $self->db->get_AnalysisStatsAdaptor->increment_a_counter( 'ready_job_count', -$claim_count, $analysis_id );

    return $claim_count ? $self->fetch_all_by_role_id_AND_status($role_id, 'CLAIMED') : [];
}


sub release_claimed_jobs_from_role {
    my ($self, $role) = @_;

        # previous value of role_id is not important, because that Role never had a chance to run the jobs
    my $num_released_jobs = $self->dbc->protected_prepare_execute( [ "UPDATE job SET status='READY', role_id=NULL WHERE role_id=? AND status='CLAIMED'", $role->dbID ],
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "releasing claimed jobs from role".$after, 0 ); }
    );

    my $analysis_stats_adaptor  = $self->db->get_AnalysisStatsAdaptor;
    my $analysis_id             = $role->analysis_id;

    $analysis_stats_adaptor->increment_a_counter( 'ready_job_count', $num_released_jobs, $analysis_id );

#    $analysis_stats_adaptor->update_status( $analysis_id, 'LOADING' );
}


=head2 release_undone_jobs_from_role

  Arg [1]    : Bio::EnsEMBL::Hive::Role object
  Arg [2]    : optional message to be recorded in 'job_message' table
  Example    :
  Description: If a Worker has died some of its jobs need to be reset back to 'READY'
               so they can be rerun.
               Jobs in state CLAIMED as simply reset back to READY.
               If jobs was 'in progress' (see the $ALL_STATUSES_OF_RUNNING_JOBS variable)
               the retry_count is increased and the status set back to READY.
               If the retry_count >= $max_retry_count (3 by default) the job is set
               to 'FAILED' and not rerun again.
  Exceptions : $role must be defined
  Caller     : Bio::EnsEMBL::Hive::Queen

=cut

sub release_undone_jobs_from_role {
    my ($self, $role, $msg) = @_;

    my $role_id         = $role->dbID;
    my $analysis        = $role->analysis;
    my $max_retry_count = $analysis->max_retry_count;
    my $worker          = $role->worker;

        #first just reset the claimed jobs, these don't need a retry_count index increment:
    $self->release_claimed_jobs_from_role( $role );

    my $sth = $self->prepare( qq{
        SELECT job_id
          FROM job
         WHERE role_id='$role_id'
           AND status in ($ALL_STATUSES_OF_RUNNING_JOBS)
    } );
    $sth->execute();

    my $cod = $worker->cause_of_death() || 'UNKNOWN';
    $msg ||= "GarbageCollector: The worker died because of $cod";

    my $resource_overusage = ($cod eq 'MEMLIMIT') || ($cod eq 'RUNLIMIT' and $worker->work_done()==0);

    while(my ($job_id) = $sth->fetchrow_array()) {

        my $passed_on = 0;  # the flag indicating that the garbage_collection was attempted and was successful

        if( $resource_overusage ) {
            if($passed_on = $self->gc_dataflow( $analysis, $job_id, $cod )) {
                $msg .= ', performing gc_dataflow';
            }
        }
        unless($passed_on) {
            if($passed_on = $self->gc_dataflow( $analysis, $job_id, 'ANYFAILURE' )) {
                $msg .= ", performing 'ANYFAILURE' gc_dataflow";
            }
        }

        $self->db()->get_LogMessageAdaptor()->store_job_message($job_id, $msg, not $passed_on );

        unless($passed_on) {
            $self->release_and_age_job( $job_id, $max_retry_count, not $resource_overusage );
        }

        $role->register_attempt( 0 );
    }
    $sth->finish();
}


sub release_and_age_job {
    my ($self, $job_id, $max_retry_count, $may_retry, $runtime_msec) = @_;
    $may_retry ||= 0;
    $runtime_msec = "NULL" unless(defined $runtime_msec);
        # NB: The order of updated fields IS important. Here we first find out the new status and then increment the retry_count:
        #
        # FIXME: would it be possible to retain role_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
        #
    $self->dbc->do( 
        "UPDATE job "
        .( ($self->dbc->driver eq 'pgsql')
            ? "SET status = CAST(CASE WHEN ($may_retry != 0) AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END AS job_status), "
            : "SET status =      CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END, "
         ).qq{
               retry_count=retry_count+1,
               runtime_msec=$runtime_msec
         WHERE job_id=$job_id
           AND status in ('CLAIMED',$ALL_STATUSES_OF_RUNNING_JOBS)
    } );

        # FIXME: move the decision making completely to the API side and so avoid the potential race condition.
    my $job         = $self->fetch_by_dbID( $job_id );

    $self->db->get_AnalysisStatsAdaptor->increment_a_counter( ($job->status eq 'FAILED') ? 'failed_job_count' : 'ready_job_count', 1, $job->analysis_id );
}


=head2 gc_dataflow

    Description:    perform automatic dataflow from a dead job that overused resources if a corresponding dataflow rule was provided
                    Should only be called once during garbage collection phase, when the job is definitely 'abandoned' and not being worked on.

=cut

sub gc_dataflow {
    my ($self, $analysis, $job_id, $branch_name) = @_;

    my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name);

    unless( $self->db->get_DataflowRuleAdaptor->count_all_by_from_analysis_id_AND_branch_code($analysis->dbID, $branch_code) ) {
        return 0;   # just return if no corresponding gc_dataflow rule has been defined
    }

    my $job = $self->fetch_by_dbID($job_id);
    $job->analysis( $analysis );

    $job->load_parameters();    # input_id_templates still supported, however to a limited extent

    $job->dataflow_output_id( $job->input_id() , $branch_name );

    $job->set_and_update_status('PASSED_ON');

        # PASSED_ON jobs are included in done_job_count
    $self->db->get_AnalysisStatsAdaptor->increment_a_counter( 'done_job_count', 1, $analysis->dbID );

    if(my $semaphored_job_id = $job->semaphored_job_id) {
        $self->decrease_semaphore_count_for_jobid( $semaphored_job_id );    # step-unblock the semaphore
    }
    
    return 1;
}


=head2 reset_jobs_for_analysis_id

  Arg [1]    : int $analysis_id
  Arg [2]    : bool $all (false by default)
  Description: Resets either all FAILED jobs of an analysis (default)
                or ALL jobs of an analysis to 'READY' and their retry_count to 0.
  Caller     : beekeeper.pl

=cut

sub reset_jobs_for_analysis_id {
    my ($self, $list_of_analyses, $input_statuses) = @_;

    my $analyses_filter = ( ref($list_of_analyses) eq 'ARRAY' )
        ? 'analysis_id IN ('.join(',', map { $_->dbID } @$list_of_analyses).')'
        : 'analysis_id='.$list_of_analyses;     # compatibility mode (to be deprecated)

    my $statuses_filter = (ref($input_statuses) eq 'ARRAY')
        ? 'AND status IN ('.join(', ', map { "'$_'" } @$input_statuses).')'
        : (!$input_statuses)
            ? "AND status='FAILED'"             # compatibility mode (to be deprecated)
            : '';

    my $sql = qq{
            UPDATE job
           SET retry_count = CASE WHEN (status='READY' OR status='CLAIMED') THEN 0 ELSE 1 END,
        }. ( ($self->dbc->driver eq 'pgsql')
            ? "status = CAST(CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END AS job_status) "
            : "status =      CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END "
        )." WHERE ".$analyses_filter
        .' '. $statuses_filter;

    my $sth = $self->prepare($sql);
    $sth->execute();
    $sth->finish;

    if( ref($list_of_analyses) eq 'ARRAY' ) {
        foreach my $analysis ( @$list_of_analyses ) {
            $self->db->get_AnalysisStatsAdaptor->update_status($analysis->dbID, 'LOADING');
        }
    } else {
        $self->db->get_AnalysisStatsAdaptor->update_status($list_of_analyses, 'LOADING');   # compatibility mode (to be deprecated)
    }
}


=head2 balance_semaphores

  Description: Reset all semaphore_counts to the numbers of unDONE semaphoring jobs.

=cut

sub balance_semaphores {
    my ($self, $list_of_analyses) = @_;

    my $analysis_filter = $list_of_analyses
        ? "funnel.analysis_id IN (".join(',', map { $_->dbID } @$list_of_analyses).") AND"
        : '';

    my $find_sql    = qq{
                        SELECT * FROM (
                            SELECT funnel.job_id, funnel.semaphore_count AS was, COALESCE(COUNT(CASE WHEN fan.status!='DONE' AND fan.status!='PASSED_ON' THEN 1 ELSE NULL END),0) AS should
                            FROM job funnel
                            LEFT JOIN job fan ON (funnel.job_id=fan.semaphored_job_id)
                            WHERE $analysis_filter
                            funnel.status in ('SEMAPHORED', 'READY')
                            GROUP BY funnel.job_id
                         ) AS internal WHERE was<>should OR should=0
                     };

    my $update_sql  = "UPDATE job SET "
        ." semaphore_count=semaphore_count+? , "
        .( ($self->dbc->driver eq 'pgsql')
            ? "status = CAST(CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END AS job_status) "
            : "status =      CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END "
        )." WHERE job_id=? AND status IN ('SEMAPHORED', 'READY')";

    my $find_sth    = $self->prepare($find_sql);
    my $update_sth  = $self->prepare($update_sql);

    my $rebalanced_jobs_counter = 0;

    $find_sth->execute();
    while(my ($job_id, $was, $should) = $find_sth->fetchrow_array()) {
        my $msg;
        if(0<$should and $should<$was) {    # we choose not to lower the counter if it's not time to unblock yet
            $msg = "Semaphore count may need rebalancing, but it is not critical now, so leaving it on automatic: $was -> $should";
            $self->db->get_LogMessageAdaptor->store_job_message( $job_id, $msg, 0 );
        } else {
            $update_sth->execute($should-$was, $job_id);
            $msg = "Semaphore count needed rebalancing now, so performing: $was -> $should";
            $self->db->get_LogMessageAdaptor->store_job_message( $job_id, $msg, 1 );
            $rebalanced_jobs_counter++;
        }
        warn "[Job $job_id] $msg\n";    # TODO: integrate the STDERR diagnostic output with LogMessageAdaptor calls in general
    }
    $find_sth->finish;
    $update_sth->finish;

    return $rebalanced_jobs_counter;
}


sub fetch_input_ids_for_job_ids {
    my ($self, $job_ids_csv, $id_scale, $id_offset) = @_;
    $id_scale   ||= 1;
    $id_offset  ||= 0;

    my %input_ids = ();

    if( $job_ids_csv ) {

        my $sql = "SELECT job_id, input_id FROM job WHERE job_id in ($job_ids_csv)";
        my $sth = $self->prepare( $sql );
        $sth->execute();

        while(my ($job_id, $input_id) = $sth->fetchrow_array() ) {
            if($input_id =~ /^_ext(?:\w+)_data_id (\d+)$/) {
                $input_id = $self->db->get_AnalysisDataAdaptor->fetch_by_analysis_data_id_TO_data($1);
            }
            $input_ids{$job_id * $id_scale + $id_offset} = $input_id;
        }
    }
    return \%input_ids;
}


1;