stun/src/main/java/com/github/freeacs/stun/JobKickThread.java
package com.github.freeacs.stun;
import com.github.freeacs.common.util.Sleep;
import com.github.freeacs.dbi.ACS;
import com.github.freeacs.dbi.ACSUnit;
import com.github.freeacs.dbi.DBI;
import com.github.freeacs.dbi.Group;
import com.github.freeacs.dbi.Inbox;
import com.github.freeacs.dbi.Job;
import com.github.freeacs.dbi.JobStatus;
import com.github.freeacs.dbi.Message;
import com.github.freeacs.dbi.Parameter;
import com.github.freeacs.dbi.Parameter.Operator;
import com.github.freeacs.dbi.Parameter.ParameterDataType;
import com.github.freeacs.dbi.Profile;
import com.github.freeacs.dbi.Unit;
import com.github.freeacs.dbi.UnitJob;
import com.github.freeacs.dbi.UnitJobs;
import com.github.freeacs.dbi.UnitParameter;
import com.github.freeacs.dbi.Unittype;
import com.github.freeacs.dbi.Unittype.ProvisioningProtocol;
import com.github.freeacs.dbi.UnittypeParameter;
import com.github.freeacs.dbi.Unittypes;
import com.github.freeacs.dbi.util.SystemParameters;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobKickThread implements Runnable {
private static final Logger log = LoggerFactory.getLogger(JobKickThread.class);
private final Properties properties;
private final DBI dbi;
private final DataSource xapsCp;
/** Key: jobId Value: Set of unitId */
private Map<Integer, Set<String>> jobKickMap;
/** Key: jobId Value: Tms of last refresh */
private final Map<Integer, Long> jobRefreshMap = new HashMap<>();
/** This inbox listens for changes on job (from other modules in xAPS). */
private final Inbox jobChangeInbox = new Inbox();
public JobKickThread(DataSource xapsCp, DBI dbi, Properties properties) {
this.xapsCp = xapsCp;
this.dbi = dbi;
this.properties = properties;
jobChangeInbox.addFilter(new Message(null, Message.MTYPE_PUB_CHG, null, Message.OTYPE_JOB));
dbi.registerInbox("jobChangeInbox", jobChangeInbox);
}
private void populateJobKickMapForOneJob(Job job, ACS acs, Unittype unittype)
throws SQLException {
Group group = job.getGroup();
long now = jobRefreshMap.get(job.getId());
ACSUnit acsUnit = new ACSUnit(xapsCp, acs, acs.getSyslog());
Map<String, Unit> unitsInGroup = acsUnit.getUnits(group);
log.info(
"Found "
+ unitsInGroup.size()
+ " units in group "
+ group.getName()
+ " from job "
+ job.getName()
+ " ("
+ job.getId()
+ ")");
Group topParent = group.getTopParent();
Profile profile = topParent.getProfile();
UnittypeParameter historyUtp =
unittype.getUnittypeParameters().getByName(SystemParameters.JOB_HISTORY);
Parameter historyParam =
new Parameter(historyUtp, "%," + job.getId() + ":%", Operator.EQ, ParameterDataType.TEXT);
Map<String, Unit> unitsCompleted =
acsUnit.getUnits(unittype, profile, historyParam, Integer.MAX_VALUE);
Iterator<Unit> iterator = unitsCompleted.values().iterator();
int runButRunAgainCounter = 0;
if (job.getRepeatCount() != null
&& job.getRepeatCount() > 0
&& job.getRepeatInterval() != null) {
while (iterator.hasNext()) {
Unit u = iterator.next();
UnitParameter up = u.getUnitParameters().get(historyUtp.getName());
String historyParameterValue = up.getValue();
for (String entry : historyParameterValue.split(",")) {
if (entry.trim().isEmpty()) {
continue;
}
JobHistoryEntry jhEntry = new JobHistoryEntry(entry);
if (jhEntry.getJobId().intValue() != job.getId()
|| jhEntry.getRepeatedCount() >= job.getRepeatCount()) {
continue;
}
long timeSinceLastRun = now - jhEntry.getLastRunTms();
if (timeSinceLastRun <= job.getRepeatInterval() * 1000) {
continue;
}
// The job was run before, but it should now be run again, hence: not completed
iterator.remove();
runButRunAgainCounter++;
}
}
}
String msg =
"Found "
+ unitsCompleted.size()
+ " units in group "
+ group.getName()
+ " from job "
+ job.getName()
+ " ("
+ job.getId()
+ ") already completed.";
if (runButRunAgainCounter > 0) {
msg += " (" + runButRunAgainCounter + " have run before, but will be repeated)";
}
log.info(msg);
List<Parameter> upList = new ArrayList<>();
UnittypeParameter currentUtp =
unittype.getUnittypeParameters().getByName(SystemParameters.JOB_CURRENT);
Parameter currentParam = new Parameter(currentUtp, "%," + job.getId() + ":%");
upList.add(currentParam);
Map<String, Unit> unitsInProcess =
acsUnit.getUnits(unittype, profile, upList, Integer.MAX_VALUE);
log.info(
"Found "
+ unitsInProcess.size()
+ " units in group "
+ group.getName()
+ " from job "
+ job.getName()
+ " ("
+ job.getId()
+ ") in process.");
Set<String> unitSet = new HashSet<>();
for (String unitId : unitsInGroup.keySet()) {
if (unitsCompleted.get(unitId) == null && unitsInProcess.get(unitId) == null) {
unitSet.add(unitId);
log.debug("Added " + unitId + " to list of units to run a telnet'ed");
}
}
jobKickMap.put(job.getId(), unitSet);
}
private void populateJobKickMapForAllJobs(ACS acs) throws SQLException {
jobKickMap = new HashMap<>();
Unittype[] unittypes = acs.getUnittypes().getUnittypes();
for (Unittype unittype : unittypes) {
if (ProvisioningProtocol.TR069 != unittype.getProtocol()) {
continue;
}
Job[] jobs = unittype.getJobs().getJobs();
for (Job job : jobs) {
if ("KICK".equals(job.getFlags().getType().name())) {
if (JobStatus.STARTED.equals(job.getStatus())) {
if (jobKickMap.get(job.getId()) == null) {
log.info(
"Job "
+ job.getName()
+ " ("
+ job.getId()
+ ") is STARTED and discovered for the first time.");
jobKickMap.put(job.getId(), new HashSet<>());
jobRefreshMap.put(job.getId(), System.currentTimeMillis());
populateJobKickMapForOneJob(job, acs, unittype);
} else {
long lastRefresh = jobRefreshMap.get(job.getId());
if (lastRefresh + properties.getKickRescan() * 60000 < System.currentTimeMillis()) {
log.info("Job " + job.getId() + " is STARTED and refreshed.");
jobRefreshMap.put(job.getId(), System.currentTimeMillis());
populateJobKickMapForOneJob(job, acs, unittype);
}
}
} else if (jobKickMap.get(job.getId()) != null) {
log.info(
"Job "
+ job.getName()
+ " ("
+ job.getId()
+ ") is not STARTED and no more units will be kicked from this job");
jobKickMap.remove(job.getId());
}
}
}
}
}
private Job findJobById(Integer jobId) {
for (Unittype unittype : dbi.getAcs().getUnittypes().getUnittypes()) {
Job job = unittype.getJobs().getById(jobId);
if (job != null) {
return job;
}
}
return null;
}
private void kickJobs(ACS acs) {
int kickInterval = properties.getKickInterval();
Sleep kickSleep = new Sleep(kickInterval, kickInterval / 10, false);
for (Map.Entry<Integer, Set<String>> entry : jobKickMap.entrySet()) {
Integer jobId = entry.getKey();
Job job = findJobById(jobId);
Set<String> unitIdSet = entry.getValue();
Iterator<String> iterator = unitIdSet.iterator();
while (iterator.hasNext()) {
String unitId = iterator.next();
try {
if (job != null && newJobStartedOrRunningJobStopped(job)) {
return;
}
kickSleep.sleep();
ACSUnit acsUnit = new ACSUnit(xapsCp, acs, acs.getSyslog());
Unit unit = acsUnit.getUnitById(unitId);
if (unit != null) {
startUnitJob(unit, jobId, acs);
Kick.kick(unit, properties);
} else {
log.error(unitId + " was not found in xAPS, not possible to kick");
}
iterator.remove();
} catch (Throwable t) {
log.error(unitId + " experienced an error during kick, will be tried again later " + t);
}
}
}
}
public void run() {
try {
Sleep sleep = new Sleep(1000, 1000, true);
do {
sleep.sleep();
if (Sleep.isTerminated()) {
break;
}
ACS acs = dbi.getAcs();
populateJobKickMapForAllJobs(acs);
kickJobs(acs);
} while (true);
} catch (Throwable t) {
log.error(
"An error ocurred, JobKickSpawner exits - server is not able to process job-kick anymore!!!",
t);
}
}
private void startUnitJob(Unit u, Integer jobId, ACS acs) throws SQLException {
ACSUnit acsUnit = new ACSUnit(xapsCp, acs, acs.getSyslog());
Unittype unittype = u.getUnittype();
UnittypeParameter currentUtp =
unittype.getUnittypeParameters().getByName(SystemParameters.JOB_CURRENT);
UnitParameter currentUp =
new UnitParameter(currentUtp, u.getId(), String.valueOf(jobId), u.getProfile());
List<UnitParameter> upList = new ArrayList<>();
upList.add(currentUp);
acsUnit.addOrChangeUnitParameters(upList);
UnitJobs unitJobs = new UnitJobs(xapsCp);
UnitJob uj = new UnitJob(u.getId(), jobId);
uj.setStartTimestamp(new Date());
boolean updated = unitJobs.start(uj);
log.debug(
u.getId()
+ " was marked as started in unit-job table (result from insert: "
+ updated
+ ")");
}
private boolean newJobStartedOrRunningJobStopped(Job j) {
if (!JobStatus.STARTED.equals(j.getStatus())) {
log.info("Job " + j.getId() + " is no longer running, aborting this job");
return true;
}
List<Message> messages = jobChangeInbox.getUnreadMessages();
for (Message jcMessage : messages) {
Inbox dbiInbox = dbi.retrieveInbox(DBI.PUBLISH_INBOX_NAME);
List<Message> dbiMessages = dbiInbox.getAllMessages();
String jcM = jcMessage.getMessageType() + jcMessage.getObjectType() + jcMessage.getObjectId();
log.debug("Job change inbox reported that job " + jcMessage.getObjectId() + " has changed");
boolean messageProcessed;
do {
messageProcessed = true;
for (Message dbiMessage : dbiMessages) {
String dbiM =
dbiMessage.getMessageType() + dbiMessage.getObjectType() + dbiMessage.getObjectId();
if (jcM.equals(dbiM)) {
log.debug(
"DBI has not yet processed the change message for job "
+ dbiMessage.getObjectId()
+ ", waiting 500 ms");
messageProcessed = false;
break;
}
}
if (messageProcessed) {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
} while (true);
log.debug("DBI has processed the change message for job " + jcMessage.getObjectId());
jobChangeInbox.markMessageAsRead(jcMessage);
Unittypes unittypes = dbi.getAcs().getUnittypes();
for (Unittype unittype : unittypes.getUnittypes()) {
Integer jobId = Integer.valueOf(jcMessage.getObjectId());
Job job = unittype.getJobs().getById(jobId);
if (job != null && JobStatus.STARTED.equals(job.getStatus())) {
if (jobKickMap.get(jobId) == null) {
log.info(
"Job "
+ jobId
+ " is STARTED, but is not part of the JobKickMap. Will abort and rebuild the queue.");
jobChangeInbox.deleteReadMessage();
return true;
} else {
log.debug(
"Job "
+ jobId
+ " is STARTED, but since it is part of the JobKickMap no special action is required.");
}
} else if (job != null) {
log.debug(
"Job "
+ jobId
+ " is changed, but since the status is not STARTED no special action is required.");
}
}
}
if (!messages.isEmpty()) {
jobChangeInbox.deleteReadMessage();
}
return false;
}
}