configrd/configrd-service

View on GitHub
src/main/java/io/configrd/core/git/GitConfigSource.java

Summary

Maintainability
C
1 day
Test Coverage
package io.configrd.core.git;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.stream.Collectors;
import org.eclipse.jgit.api.CloneCommand;
import org.eclipse.jgit.api.CommitCommand;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.GitCommand;
import org.eclipse.jgit.api.MergeCommand;
import org.eclipse.jgit.api.MergeResult;
import org.eclipse.jgit.api.PullCommand;
import org.eclipse.jgit.api.PullResult;
import org.eclipse.jgit.api.PushCommand;
import org.eclipse.jgit.api.TransportCommand;
import org.eclipse.jgit.api.TransportConfigCallback;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.api.errors.InvalidRemoteException;
import org.eclipse.jgit.api.errors.JGitInternalException;
import org.eclipse.jgit.dircache.DirCache;
import org.eclipse.jgit.merge.MergeStrategy;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.transport.JschConfigSessionFactory;
import org.eclipse.jgit.transport.OpenSshConfig.Host;
import org.eclipse.jgit.transport.PushResult;
import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.SshSessionFactory;
import org.eclipse.jgit.transport.SshTransport;
import org.eclipse.jgit.transport.Transport;
import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
import org.eclipse.jgit.util.FS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import io.configrd.core.exception.InitializationException;
import io.configrd.core.source.DefaultConfigSource;
import io.configrd.core.source.FileConfigSource;
import io.configrd.core.source.FileStreamSource;
import io.configrd.core.source.PropertyPacket;
import io.configrd.core.source.StreamPacket;
import io.configrd.core.source.StreamSource;
import io.configrd.core.source.WritableConfigSource;
import io.configrd.core.util.StringUtils;
import io.configrd.core.util.URIBuilder;

public class GitConfigSource extends DefaultConfigSource<GitStreamSource>
    implements FileConfigSource, WritableConfigSource, Closeable {

  private final static Logger logger = LoggerFactory.getLogger(GitConfigSource.class);

  private static final Timer timer = new Timer(true);

  public final GitCredentials creds;

  private final TimerTask pullTask = new TimerTask() {

    @Override
    public void run() {
      gitPull();
    }
  };

  private Git git;

  public void init() {
    gitClone();

    if (getStreamSource().getSourceConfig().getRefresh() > 0) {
      logger.info("Setting timed pulling at " + getStreamSource().getSourceConfig().getRefresh()
          + " seconds");
      timer.scheduleAtFixedRate(pullTask, getStreamSource().getSourceConfig().getRefresh() * 1000,
          getStreamSource().getSourceConfig().getRefresh() * 1000);
    }
  }

  private void setupCredentials(String scheme, TransportCommand<? extends GitCommand, ?> command) {

    if (getStreamSource().getSourceConfig().getAuthMethod() != null
        && (scheme == null || (!scheme.toLowerCase().startsWith("http")
            && !scheme.toLowerCase().startsWith("git:")))) {

      SshSessionFactory sshSessionFactory = new JschConfigSessionFactory() {
        @Override
        protected void configure(Host host, Session session) {
          session.setConfig("StrictHostKeyChecking", "no");
        }

        @Override
        protected JSch createDefaultJSch(FS fs) throws JSchException {
          JSch defaultJSch = super.createDefaultJSch(fs);
          defaultJSch.addIdentity(getStreamSource().getSourceConfig().getUsername(),
              getStreamSource().getSourceConfig().getPassword());
          return defaultJSch;
        }
      };

      command.setTransportConfigCallback(new TransportConfigCallback() {
        @Override
        public void configure(Transport transport) {

          SshTransport sshTransport = (SshTransport) transport;
          sshTransport.setSshSessionFactory(sshSessionFactory);
        }
      });

    } else {

      if (getStreamSource().getSourceConfig().getAuthMethod() != null) {
        command.setCredentialsProvider(
            new UsernamePasswordCredentialsProvider(creds.getUsername(), creds.getPassword()));
      }
    }
  }

  @Override
  public void close() throws IOException {
    if (git != null)
      git.close();

    pullTask.cancel();
  }

  private boolean gitPush(RevCommit commit) {

    boolean success = true;

    RefSpec spec = null;
    try {

      spec = new RefSpec(commit.name() + ":" + git.getRepository().getFullBranch());

    } catch (Exception e) {
      throw new IllegalStateException(e);
    }

    // Goal is to overwrite remote with local state, it's a PUT
    PushCommand push = git.push().setForce(true).setRefSpecs(spec);

    logger.debug("Pushing " + spec);

    setupCredentials(getStreamSource().getSourceConfig().toURIish().getScheme(), push);

    try {

      Iterable<PushResult> result = push.call();

      Set<String> messages = new HashSet<>();

      for (PushResult r : result) {

        success = success && r.getRemoteUpdates().stream()
            .allMatch(s -> (RemoteRefUpdate.Status.OK.equals(s.getStatus())
                || RemoteRefUpdate.Status.UP_TO_DATE.equals(s.getStatus())));

        if (!success) {
          messages.addAll(r.getRemoteUpdates().stream()
              .filter(u -> !StringUtils.hasText(u.getMessage())).map(u -> {
                return u.getStatus() + " - " + u.getMessage();
              }).collect(Collectors.toSet()));
        }
      }

      if (!messages.isEmpty()) {
        logger.error("Push of ref spec " + spec + " failed.");
        for (String s : messages) {
          logger.error(s);
        }
      } else {
        success = true;
        logger.info("Push " + spec + " succeeded.");
      }

    } catch (Exception e) {
      throw new RuntimeException(e);
    }

    return success;
  }

  private boolean gitPull() {

    Boolean result = Boolean.FALSE;

    gitConnect();

    MergeResult rr = null;
    try {

      PullCommand pc = git.pull().setStrategy(MergeStrategy.RECURSIVE)
          .setFastForward(MergeCommand.FastForwardMode.NO_FF);

      setupCredentials(getStreamSource().getSourceConfig().toURIish().getScheme(), pc);

      PullResult pullRes = pc.call();
      rr = pullRes.getMergeResult();

      if (rr != null && rr.getMergeStatus().isSuccessful()) {
        result = Boolean.TRUE;
        logger.debug(rr.toString());
      } else if (rr != null) {
        logger.error("Merge on pull failed with " + rr.toString());
      }


    } catch (Exception e) {
      logger.error(e.getMessage());

      if (e.getMessage().toLowerCase().contains(("ref may not exist"))) {
        result = true;
      }
    }

    return result;
  }

  private boolean gitClone() {

    String cloneFrom = getStreamSource().getSourceConfig().getUri();

    URI cloneTo = getStreamSource().toClone();

    logger.info("Cloning " + cloneFrom + " into " + cloneTo);

    try {

      CloneCommand clone = Git.cloneRepository().setURI(cloneFrom).setDirectory(new File(cloneTo));

      if (StringUtils.hasText(getStreamSource().getSourceConfig().getBranchName())) {
        clone = clone.setBranch(getStreamSource().getSourceConfig().getBranchName());
      }

      setupCredentials(getStreamSource().getSourceConfig().toURIish().getScheme(), clone);

      git = clone.call();

    } catch (JGitInternalException e) {

      if (e.getMessage().toLowerCase().contains("already exists")) {
        logger.warn("Clone " + cloneTo + " already exists. Refreshing.");
        return gitPull();
      }

    } catch (InvalidRemoteException e2) {
   
      throw new IllegalArgumentException(e2);
   
    } catch (GitAPIException e3) {
    
      throw new InitializationException(e3.getMessage());

    }
    return true;

  }

  private void gitConnect() {

    if (git == null) {
      final URI cloneTo = URIBuilder.create().setScheme("file")
          .setPath(getStreamSource().getSourceConfig().getLocalClone(),
              getStreamSource().getSourceConfig().getName())
          .build();

      try {
        git = Git.open(new File(cloneTo));

        if (StringUtils.hasText(getStreamSource().getSourceConfig().getBranchName()))
          git.checkout().setName(getStreamSource().getSourceConfig().getBranchName()).call();

      } catch (Exception e) {
        logger.error(e.getMessage());
      }
    }

  }

  public GitConfigSource(GitStreamSource source, Map<String, Object> values, GitCredentials creds) {
    super(source, values);
    this.creds = creds;
  }

  @Override
  public Map<String, Object> getRaw(String path) {

    Optional<? extends PropertyPacket> stream = streamSource.stream(path);

    if (!stream.isPresent())
      return new HashMap<>();

    return stream.get();
  }

  @Override
  public Map<String, Object> get(String path, Set<String> names) {

    if (getStreamSource().getSourceConfig().getRefresh() < 1) {
      if (!gitPull()) {
        logger.error("Pull operation failed. Possibly reading stale values");
      }
    }

    return super.get(path, names);
  }

  @Override
  public boolean isCompatible(StreamSource source) {
    return (source instanceof GitStreamSource);
  }

  @Override
  public Optional<StreamPacket> getFile(String path) {
    return ((FileStreamSource) streamSource).streamFile(path);
  }

  private RevCommit gitCommit(final String message) {

    RevCommit revCommit = null;
    try {

      CommitCommand commit = git.commit().setMessage(message);
      revCommit = commit.call();

      logger.info("Committed " + revCommit.getName() + ", " + revCommit.getFullMessage());

    } catch (Exception e) {
      logger.error(e.getMessage());
      throw new RuntimeException(e);
    }

    return revCommit;
  }

  private DirCache gitAdd(PropertyPacket packet) {

    // run the add
    URI fileName = getStreamSource().toRelative(packet);

    try {
      DirCache cache = git.add().addFilepattern(fileName.toString()).call();

      for (int i = 0; i < cache.getEntryCount(); i++) {
        logger.debug("Added " + cache.getEntry(i));
      }

      return cache;

    } catch (Exception ex) {
      throw new RuntimeException(ex);
    }
  }

  @Override
  public boolean put(String path, Map<String, Object> props) {

    PropertyPacket packet = null;
    if (!(props instanceof PropertyPacket)) {

      packet = new PropertyPacket(URI.create(path));
      packet.putAll(props);

    } else {

      packet = (PropertyPacket) props;

    }

    gitConnect();

    synchronized (git) {

      if (!packet.isEmpty() && gitPull()) {

        boolean success = getStreamSource().put(path, packet);

        DirCache c = gitAdd(packet);
        RevCommit commit = gitCommit("Updated " + packet.size() + " entries.");
        return gitPush(commit);

      } else {
        logger.warn("Packet to write to " + packet.getUri() + " is empty. Nothing changed.");
      }
    }

    return false;
  }

  @Override
  public boolean patch(String path, String etag, Map<String, Object> props) {
    // TODO Auto-generated method stub
    return false;
  }

}