JiPaix/xdccJS

View on GitHub
src/downloader.ts

Summary

Maintainability
C
1 day
Test Coverage
/* eslint-disable max-len */
/* eslint-disable no-param-reassign */
import * as fs from 'fs';
import * as net from 'net';
import { PassThrough } from 'stream';
import { ThrottleGroup } from 'stream-throttle';
import { CtcpParser, ParamsCTCP } from './ctcp_parser';
import type { FileInfo } from './interfaces/fileinfo';
import type { Job } from './interfaces/job';
import getIp from './lib/get-ip';
import * as ProgressBar from './lib/progress';

export type ParamsDL = ParamsCTCP & {
  /**
   * Array of ports for passive DCC
   * @default [5001]
   * @remark Some xdcc bots use passive dcc, this require to have these ports opened on your computer/router/firewall
   * @example
   * ```js
   * params.passivePort = [3213, 3214]
   * ```
   */
  passivePort: number[]
  /**
   * Throttle speed (KiB/s)
   * @default undefined
   * @example
   * ```js
   * param.throttle = 500
   * ```
   */
  throttle?:number
}

interface Pass {
  startTime?: number
  server?: net.Server
  client?: net.Socket
  stream: fs.WriteStream | PassThrough
  candidate: Job
  fileInfo: FileInfo
  pick?: number
  bar?: ProgressBar
  received: number,
  bufferType: '64bit' | '32bit'
}

export default class Downloader extends CtcpParser {
  protected passivePort: number[];

  private ip: Promise<{
    v4: string | undefined;
    v6: string | undefined;
  }>;

  protected throttle?: number;

  constructor(params: ParamsDL) {
    super(params);
    this.ip = Downloader.getIp();
    this.passivePort = params.passivePort;
    this.throttle = params.throttle;

    this.on('prepareDL', (downloadrequest: { fileInfo: FileInfo; candidate: Job }) => {
      this.prepareDL(downloadrequest);
    });
  }

  static async getIp() {
    const ip = await getIp();
    let v4:string|undefined;
    if (ip.v4) {
      const d = ip.v4.split('.');
      const results = ((+d[0] * 256 + +d[1]) * 256 + +d[2]) * 256 + +d[3];
      v4 = `${results}`;
    }
    return { v4, v6: ip.v6 };
  }

  private setupStream(fileInfo: FileInfo): fs.WriteStream | PassThrough {
    if (this.path) {
      if (fileInfo.type === 'DCC ACCEPT') {
        return fs.createWriteStream(fileInfo.filePath, {
          flags: 'r+',
          start: fileInfo.position,
        });
      } if (fileInfo.type === 'DCC SEND') {
        return fs.createWriteStream(fileInfo.filePath);
      }
      throw Error('Error in control flow: setupStream');
    } else {
      return new PassThrough();
    }
  }

  private prepareDL(downloadrequest: { fileInfo: FileInfo; candidate: Job }): void {
    const { fileInfo } = downloadrequest;
    const { candidate } = downloadrequest;
    const stream = this.setupStream(fileInfo);
    if (fileInfo.port === 0) {
      this.emit('debug', 'xdccJS:: TCP_INCOMING_READY');
      const pick = this.portPicker();
      const server = net.createServer((client) => {
        this.SetupTimeout({
          candidate,
          eventType: 'error',
          message: '%danger% Timeout: no initial connnection',
          delay: this.timeout,
          disconnectAfter: {
            stream,
            server,
            socket: client,
            pick,
          },
          padding: 6,
          fileInfo,
        });
        this.processDL(server, client, stream, candidate, fileInfo, pick);
      });

      const listenIp = candidate.opts && candidate.opts.ipv6 ? '::' : '0.0.0.0';

      server.listen(pick, listenIp, () => {
        this.ip.then((ip) => {
          this.raw(
            `PRIVMSG ${candidate.nick} ${String.fromCharCode(1)}DCC SEND ${fileInfo.file} ${ip} ${pick} ${
              fileInfo.length
            } ${fileInfo.token}${String.fromCharCode(1)}`,
          );
        }).catch((e) => {
          const pass = {
            server,
            stream,
            candidate,
            fileInfo,
            pick,
            received: 0,
            bufferType: fileInfo.length > 4294967295 ? '64bit' : '32bit' as '64bit' | '32bit',
          };
          this.onError(pass, e);
        });
      });
    } else {
      this.emit('debug', 'xdccJS:: TCP_OUTGOING_READY');
      const client = net.connect(fileInfo.port, fileInfo.ip);
      this.processDL(undefined, client, stream, candidate, fileInfo, undefined);
    }
  }

  private portPicker(): number | undefined {
    const available = this.passivePort.filter((ports) => !this.portInUse.includes(ports));
    const pick = available[Math.floor(Math.random() * available.length)];
    this.portInUse.push(pick);
    return pick;
  }

  private processDL(
    server: net.Server | undefined,
    client: net.Socket,
    stream: fs.WriteStream | PassThrough,
    candidate: Job,
    fileInfo: FileInfo,
    pick: number | undefined,
  ): void {
    candidate.cancel = this.makeCancelable(candidate, client);
    this.print(`%info% downloading : %cyan%${fileInfo.file}`, 5);
    const bar = Downloader.setupProgressBar(fileInfo.length);

    const pass: Pass = {
      server,
      client,
      stream,
      candidate,
      fileInfo,
      pick,
      bar,
      received: 0,
      bufferType: fileInfo.length > 4294967295 ? '64bit' : '32bit',
    };
    client.setTimeout(this.timeout);
    client.on('timeout', () => this.onTimeOut(pass));
    client.on('error', (e) => this.onError(pass, e));
    const sendBuffer = Buffer.alloc(pass.bufferType === '64bit' ? 8 : 4);

    const throttle = candidate.opts ? candidate.opts.throttle : this.throttle;

    if (throttle) {
      const tg = new ThrottleGroup({ rate: throttle });
      const thr = client.pipe(tg.throttle({ rate: throttle }));
      thr.on('data', (data) => this.onData(pass, data, sendBuffer));
    } else {
      client.on('data', (data) => this.onData(pass, data, sendBuffer));
    }
    client.once('data', () => this.emit('debug', 'xdccJS:: TCP_DOWNLOADING'));
    client.on('close', (e) => this.onClose(pass, e));
  }

  private onTimeOut(args: Pass): void {
    if (args.received === args.fileInfo.length) return;
    this.emit('debug', 'xdccJS:: TCP_TIMEOUT');
    this.SetupTimeout({
      candidate: args.candidate,
      eventType: 'error',
      message: 'Timeout',
      delay: 0,
      disconnectAfter: {
        stream: args.stream,
        server: args.server,
        socket: args.client,
        pick: args.pick,
        bar: args.bar,
      },
      padding: 6,
      fileInfo: args.fileInfo,
      executeLater: () => {
        this.redownload(args.candidate, args.fileInfo);
      },
    });
  }

  private onError(args: Pass, e: Error): void {
    if (args.received === args.fileInfo.length) return;

    if (e.message === 'cancel') this.emit('debug', 'xdccJS:: TCP_CANCEL');
    else this.emit('debug', `xdccJS:: TCP_ERROR @ ${e.message}`);

    this.SetupTimeout({
      candidate: args.candidate,
      eventType: e.message === 'cancel' ? 'cancel' : 'error',
      message:
          e.message === 'cancel'
            ? 'Cancelled by user'
            : `Connection error: %yellow%${e.message}`,
      delay: 0,
      disconnectAfter: {
        stream: args.stream,
        server: args.server,
        socket: args.client,
        pick: args.pick,
        bar: args.bar,
      },
      padding: 4,
      fileInfo: args.fileInfo,
      executeLater: () => {
        if (e.message === 'cancel') {
          args.candidate.failures.push(args.candidate.now);
          args.candidate.queue = [];
          if (fs.existsSync(args.fileInfo.filePath)) {
            fs.unlinkSync(args.fileInfo.filePath);
          }
          this.emit('next', args.candidate, this.verbose);
        } else {
          this.redownload(args.candidate, args.fileInfo);
        }
      },
    });
  }

  private onClose(args: Pass, e: boolean): void {
    if (e && args.received !== args.fileInfo.length) {
      this.emit('debug', `xdccJS:: TCP_CLOSE_ERROR @ ${e}`);
      return;
    }
    this.print('%success% done.', 6);
    this.emit('debug', 'xdccJS:: TCP_CLOSE_SUCCESS');
    args.candidate.timeout.clear();
    args.candidate.success.push(args.fileInfo.file);
    if (args.server && args.pick) {
      args.server.close(() => {
        this.portInUse = this.portInUse.filter((p) => p !== args.pick);
      });
    }
    args.stream.end();
    this.emit('downloaded', args.fileInfo);
    args.candidate.emit('downloaded', args.fileInfo);
    this.emit('next', args.candidate, this.verbose);
  }

  private onData(args: Pass, data: Buffer, sendBuffer: Buffer): void {
    const startTime = args.startTime || Date.now();

    if (args.received === 0) {
      args.startTime = startTime;
      args.candidate.timeout.clear();
      if (!this.path) {
        args.candidate.emit('pipe', args.stream, args.fileInfo);
        this.emit('pipe', args.stream, args.fileInfo);
      }
    }
    args.stream.write(data);
    args.received += data.length;

    const elapsedTime = Date.now() - startTime;
    const downloadSpeed = args.received / elapsedTime
    const remainingData = args.fileInfo.length - args.received;
    const eta = remainingData / downloadSpeed;

    if (args.bufferType === '64bit') {
      sendBuffer.writeBigUInt64BE(BigInt(args.received), 0);
    }

    if (args.bufferType === '32bit') {
      sendBuffer.writeUInt32BE(args.received, 0);
    }

    if (this.verbose && args.bar) args.bar.tick(data.length);
    if (!args.client?.destroyed && args.client?.writable) {
      args.client.write(sendBuffer);
    }
    args.candidate.emit('downloading', args.fileInfo, args.received, (args.received / args.fileInfo.length) * 100, eta);
    this.emit('downloading', args.fileInfo, args.received, (args.received / args.fileInfo.length) * 100, eta);
  }

  protected static setupProgressBar(len: number): ProgressBar {
    return new ProgressBar(''.padStart(6) + CtcpParser.replace(':roll [:bar] ETA: :eta @ :rate - :percent'), {
      complete: '=',
      incomplete: ' ',
      width: 20,
      total: len,
      clear: true,
      renderThrottle: 100,
    });
  }
}