polkadot-js/client

View on GitHub
packages/client-sync/src/index.ts

Summary

Maintainability
B
4 hrs
Test Coverage
// Copyright 2017-2019 @polkadot/client-sync authors & contributors
// This software may be modified and distributed under the terms
// of the Apache-2.0 license. See the LICENSE file for details.

import { Config } from '@polkadot/client/types';
import { ChainInterface } from '@polkadot/client-chains/types';
import { PeerInterface, PeersInterface } from '@polkadot/client-p2p/types';
import { Hash, Header } from '@polkadot/types/interfaces';
import { SyncInterface, SyncStatePeerBlock, SyncStatePeerRequest, SyncStatus } from './types';

import BN from 'bn.js';
import EventEmitter from 'eventemitter3';
import { BlockData } from '@polkadot/client-types';
import { BlockAnnounce, BlockRequest, BlockResponse } from '@polkadot/client-types/messages';
import { BlockRequestDirection, BlockRequestFrom } from '@polkadot/client-types/messages/BlockRequest';
import { isBn, isU8a, logger, u8aToHex } from '@polkadot/util';

import defaults from './defaults';

const REQUEST_TIMEOUT = 60000;
const MAX_REQUEST_BN = new BN(defaults.MAX_REQUEST_BLOCKS);

const l = logger('sync');

export default class Sync extends EventEmitter implements SyncInterface {
  private chain: ChainInterface;

  private config: Config;

  private blockRequests: Map<string, SyncStatePeerRequest> = new Map();

  private blockQueue: Map<string, SyncStatePeerBlock> = new Map();

  private bestQueued: BN = new BN(0);

  private isActive = false;

  private lastBest: BN = new BN(0);

  private peers: PeersInterface | null;

  public bestSeen: BN = new BN(0);

  public status: SyncStatus = 'Idle';

  public constructor (config: Config, chain: ChainInterface) {
    super();

    this.chain = chain;
    this.config = config;
    this.peers = null;
    this.isActive = true;

    this.setProcessTimeout(false);
  }

  public stop (): void {
    this.isActive = false;
  }

  public setPeers (peers: PeersInterface): void {
    this.peers = peers;
  }

  private setProcessTimeout (isFast = true): void {
    // eslint-disable-next-line @typescript-eslint/no-misused-promises
    setTimeout(async (): Promise<void> => {
      try {
        await this.processBlocks();
      } catch (error) {
        // ignore
      }
    }, isFast ? 1 : 100);
  }

  private announce (header: Header): void {
    if (header.number.unwrap().lte(this.lastBest) || !this.peers || this.status === 'Sync') {
      return;
    }

    this.lastBest = header.number.unwrap();
    this.peers.peers().forEach((peer): void => {
      if (peer.bestNumber.lt(this.lastBest)) {
        peer.send(
          new BlockAnnounce({ header })
        );
      }
    });
  }

  private async processBlocks (): Promise<void> {
    const hasOne = await this.processBlock();

    this.setProcessTimeout(hasOne);
  }

  private setStatus (): void {
    this.status = this.blockQueue.size > defaults.MIN_IDLE_BLOCKS
      ? 'Sync'
      : 'Idle';
  }

  private hasBlockData (hash: Uint8Array): boolean {
    const data = this.chain.blocks.blockData.get(hash);

    return !!data && !!data.length;
  }

  private getNextBlock (): SyncStatePeerBlock | null {
    if (!this.isActive) {
      return null;
    }

    let result: SyncStatePeerBlock | null = null;

    this.blockQueue.forEach((queued, blockId): void => {
      if (!result) {
        const { block: { header } } = queued;

        if (this.hasBlockData(header.hash)) {
          this.blockQueue.delete(blockId);
        } else if (this.hasBlockData(header.parentHash)) {
          result = queued;
        }
      }
    });

    return result;
  }

  private async processBlock (): Promise<boolean> {
    this.setStatus();

    const nextImportable = this.getNextBlock();

    if (!nextImportable) {
      this.requestOther();

      return false;
    }

    const { blockId, block, peer } = nextImportable;
    const result = this.config.sync === 'full'
      ? await this.chain.executor.importBlock(block)
      : await this.chain.executor.importHeader(block);

    if (!result) {
      return false;
    }

    const queueCount = this.blockQueue.size;

    this.blockQueue.delete(blockId);

    if (queueCount < defaults.MIN_QUEUE_SIZE && !this.blockRequests.get(peer.id)) {
      this.requestBlocks(peer);
    }

    this.announce(block.header);
    this.emit('imported');

    return true;
  }

  private blocksFromHash (count: number, from: Hash, to: Hash | null, increment: BN): Uint8Array[] {
    const data = new BlockData(this.chain.blocks.blockData.get(from));

    // nothing here, just get out gracefully
    if (data.isEmpty) {
      return [];
    }

    return this.blocksFromNumber(count, data.header.number.unwrap(), to, increment);
  }

  private blocksFromNumber (count: number, from: BN, to: Hash | null, increment: BN): Uint8Array[] {
    const best = this.chain.blocks.bestNumber.get();
    const blocks: Uint8Array[] = [];
    let current = from;

    // l.debug(() => `Reading ${count} blocks from #${from} -> ${to}, ${increment} (best #${best})`);

    // get the requested number of blocks, either while not the best or not zero
    // (for ascending and decending respectively)
    while (count && current.lte(best) && !current.isZero()) {
      const hash = this.chain.blocks.hash.get(current);

      // make sure we have a valid hash
      if (!hash.length) {
        break;
      }

      const block = this.chain.blocks.blockData.get(hash);

      // we should have an actual block
      if (!block.length) {
        break;
      }

      blocks.push(block);

      // continue the loop if we have not reached out target
      // (below is the catch all for the various ifs, exiting)
      if (to && to.eq(hash)) {
        break;
      }

      // we have one more, add the increment for the next block
      count--;
      current = current.add(increment);
    }

    return blocks;
  }

  public provideBlocks (peer: PeerInterface, request: BlockRequest): void {
    const increment = request.direction.isAscending ? new BN(1) : new BN(-1);
    const count = Math.min(request.max.unwrapOr(MAX_REQUEST_BN).toNumber(), defaults.MAX_REQUEST_BLOCKS);
    const to = request.to.unwrapOr(null);
    const blocks = request.from.isHash
      ? this.blocksFromHash(count, request.from.asHash(), to, increment)
      : this.blocksFromNumber(count, request.from.asBlockNumber(), to, increment);

    l.debug((): string => `Providing ${blocks.length} blocks to ${peer.shortId}, ${request.from.toString()}+`);

    peer.send(
      new BlockResponse({
        blocks,
        id: request.id
      })
    );
  }

  public queueBlocks (peer: PeerInterface, { blocks, id }: BlockResponse): void {
    const request = this.blockRequests.get(peer.id);
    const bestNumber = this.chain.blocks.bestNumber.get();

    this.blockRequests.delete(peer.id);

    if (!request) {
      // l.warn(`Unrequested response from ${peer.shortId}`);
    } else if (!id.eq(request.request.id)) {
      // l.warn(`Mismatched response from ${peer.shortId}`);
    }

    let firstNumber: BN | null = null;
    let count = 0;

    for (let i = 0; i < blocks.length; i++) {
      const block = blocks[i];

      // console.error(JSON.stringify(block), block.toHex());

      const dbBlock = this.chain.blocks.blockData.get(block.hash);
      const blockNumber = block.header.number.unwrap();
      const blockId = blockNumber.toString();

      if ((dbBlock.length && blockNumber.lte(bestNumber)) || this.blockQueue.get(blockId)) {
        continue;
      }

      this.blockQueue.set(blockId, {
        blockId,
        block,
        peer
      });
      firstNumber = firstNumber || blockNumber;

      if (this.bestQueued.lt(blockNumber)) {
        this.bestQueued = blockNumber;
      }

      count++;
    }

    if (count && firstNumber) {
      l.debug(`Queued ${count} from ${peer.shortId}, #${firstNumber.toString()}+`);
    }

    // this.requestBlocks(peer);
  }

  private requestFromPeer (peer: PeerInterface, from: BN | Uint8Array | null, isStale: boolean): void {
    const isFromValid = !isBn(from) || from.lte(peer.bestNumber);

    if (this.blockRequests.get(peer.id) || !peer.isActive() || !from || !isFromValid) {
      return;
    }

    const isHash = isU8a(from);
    const fromStr = isHash
      ? u8aToHex(from as Uint8Array, 48)
      : `#${from.toString()}`;

    l.debug((): string => `Requesting from ${peer.shortId}, ${fromStr} ${isStale ? '(older)' : '-'}`);

    const request = new BlockRequest({
      direction: new BlockRequestDirection(isHash ? 'Descending' : 'Ascending'),
      // fields: new BlockRequest$Fields(
      //   this.config.sync === 'full'
      //     ? ['header', 'body', 'justification']
      //     : ['header']
      // ),
      from: new BlockRequestFrom(from, isHash ? 0 : 1),
      id: peer.getNextId(),
      max: defaults.MAX_REQUEST_BLOCKS
    });

    this.blockRequests.set(peer.id, {
      peer,
      request,
      timeout: Date.now() + REQUEST_TIMEOUT
    });

    peer.send(request);
  }

  public requestBlocks (peer: PeerInterface): void {
    this.timeoutRequests();

    const nextNumber = this.chain.blocks.bestNumber.get().addn(1);
    const from = this.bestQueued.lt(nextNumber)
      ? nextNumber
      : (
        this.bestQueued.sub(nextNumber).ltn(defaults.MIN_QUEUE_SIZE)
          ? this.bestQueued.addn(1)
          : null
      );

    if (peer.bestNumber.gt(this.bestSeen)) {
      this.bestSeen = peer.bestNumber;
    }

    this.requestFromPeer(peer, from, false);
  }

  private requestOther (): void {
    let result: SyncStatePeerBlock | null = null;

    this.blockQueue.forEach((current): void => {
      if (!result || current.block.header.number.unwrap().lt(result.block.header.number.unwrap())) {
        result = current;
      }
    });

    if (!result) {
      return;
    }

    this.requestFromPeer(
      (result as SyncStatePeerBlock).peer,
      (result as SyncStatePeerBlock).block.header.number.unwrap().subn(defaults.MAX_REQUEST_BLOCKS),
      true
    );
  }

  private timeoutRequests (): void {
    const now = Date.now();

    this.blockRequests.forEach((request, key): void => {
      if (request.timeout < now) {
        this.blockRequests.delete(key);
      }
    });
  }
}