jnidzwetzki/bitfinex-v2-wss-api-java

View on GitHub
src/main/java/com/github/jnidzwetzki/bitfinex/v2/PooledBitfinexApiBroker.java

Summary

Maintainability
A
2 hrs
Test Coverage
/*******************************************************************************
 *
 *    Copyright (C) 2015-2018 Jan Kristof Nidzwetzki
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 *
 *******************************************************************************/
package com.github.jnidzwetzki.bitfinex.v2;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import com.github.jnidzwetzki.bitfinex.v2.command.BitfinexCommand;
import com.github.jnidzwetzki.bitfinex.v2.command.SetConnectionFeaturesCommand;
import com.github.jnidzwetzki.bitfinex.v2.command.SubscribeCommand;
import com.github.jnidzwetzki.bitfinex.v2.command.UnsubscribeChannelCommand;
import com.github.jnidzwetzki.bitfinex.v2.entity.BitfinexApiKeyPermissions;
import com.github.jnidzwetzki.bitfinex.v2.manager.ConnectionFeatureManager;
import com.github.jnidzwetzki.bitfinex.v2.manager.OrderManager;
import com.github.jnidzwetzki.bitfinex.v2.manager.OrderbookManager;
import com.github.jnidzwetzki.bitfinex.v2.manager.PositionManager;
import com.github.jnidzwetzki.bitfinex.v2.manager.QuoteManager;
import com.github.jnidzwetzki.bitfinex.v2.manager.RawOrderbookManager;
import com.github.jnidzwetzki.bitfinex.v2.manager.TradeManager;
import com.github.jnidzwetzki.bitfinex.v2.manager.WalletManager;
import com.github.jnidzwetzki.bitfinex.v2.symbol.BitfinexStreamSymbol;
import com.github.jnidzwetzki.bitfinex.v2.util.EventsInTimeslotManager;

/**
 * BitfinexApiBroker client spreading amount of channels across multiple websocket connections.
 */
public class PooledBitfinexApiBroker implements BitfinexWebsocketClient {

    private final AtomicInteger numberOfClients = new AtomicInteger(0);
    private final Map<Integer, BitfinexWebsocketClient> clients = new ConcurrentHashMap<>();
    private final Map<BitfinexWebsocketClient, Set<BitfinexStreamSymbol>> pendingSubscriptions = new ConcurrentHashMap<>();

    private final BitfinexWebsocketConfiguration configuration;
    private final BitfinexApiCallbackRegistry callbackRegistry;
    private final SequenceNumberAuditor sequenceNumberAuditor;
    private final int maxChannelsPerClient;

    private final EventsInTimeslotManager connectEventManager;

    private final QuoteManager quoteManager;
    private final OrderbookManager orderbookManager;
    private final RawOrderbookManager rawOrderbookManager;
    private final OrderManager orderManager;
    private final TradeManager tradeManager;
    private final PositionManager positionManager;
    private final WalletManager walletManager;
    private final ConnectionFeatureManager connectionFeatureManager;

    public PooledBitfinexApiBroker(final BitfinexWebsocketConfiguration config, final BitfinexApiCallbackRegistry callbacks,
                                   final SequenceNumberAuditor seqNoAuditor, final int channelsPerConnection) {
            
        configuration = new BitfinexWebsocketConfiguration(config);
        callbackRegistry = callbacks;
        sequenceNumberAuditor = seqNoAuditor;
        maxChannelsPerClient = channelsPerConnection;

        connectEventManager = new EventsInTimeslotManager(1, configuration.getConnectionEstablishingDelay(), TimeUnit.MILLISECONDS);

        quoteManager = new QuoteManager(this, configuration.getExecutorService());
        orderbookManager = new OrderbookManager(this, configuration.getExecutorService());
        rawOrderbookManager = new RawOrderbookManager(this, configuration.getExecutorService());
        orderManager = new OrderManager(this, configuration.getExecutorService());
        tradeManager = new TradeManager(this, configuration.getExecutorService());
        positionManager = new PositionManager(this, configuration.getExecutorService());
        walletManager = new WalletManager(this, configuration.getExecutorService());
        connectionFeatureManager = new ConnectionFeatureManager(this, configuration.getExecutorService());

        callbackRegistry.onSubscribeChannelEvent(sym -> pendingSubscriptions.forEach((client, symbols) -> symbols.remove(sym)));
        callbackRegistry.onUnsubscribeChannelEvent(sym -> pendingSubscriptions.forEach((client, symbols) -> symbols.remove(sym)));

        SimpleBitfinexApiBroker authClient = new SimpleBitfinexApiBroker(configuration, callbackRegistry, seqNoAuditor, true);
        clients.put(numberOfClients.getAndIncrement(), authClient);
        pendingSubscriptions.put(authClient, ConcurrentHashMap.newKeySet());
    }

    @Override
    public void connect() {
        try {
            callbackRegistry.acceptConnectionStateChange(BitfinexConnectionStateEnum.CONNECTION_INIT);
            for (BitfinexWebsocketClient bitfinexWebsocketClient : clients.values()) {
                bitfinexWebsocketClient.connect();
            }
            callbackRegistry.acceptConnectionStateChange(BitfinexConnectionStateEnum.CONNECTION_SUCCESS);
        } catch (final Exception ex) {
            callbackRegistry.acceptConnectionStateChange(BitfinexConnectionStateEnum.CONNECTION_FAILED);
        }

    }

    @Override
    public void close() {
        try {
            callbackRegistry.acceptConnectionStateChange(BitfinexConnectionStateEnum.DISCONNECTION_INIT);
            for (BitfinexWebsocketClient bitfinexWebsocketClient : clients.values()) {
                bitfinexWebsocketClient.close();
            }
            callbackRegistry.acceptConnectionStateChange(BitfinexConnectionStateEnum.DISCONNECTION_SUCCESS);
        } catch (final Exception ex) {
            callbackRegistry.acceptConnectionStateChange(BitfinexConnectionStateEnum.DISCONNECTION_FAILED);
        }
    }

    @Override
    public void sendCommand(BitfinexCommand command) {
        if (command instanceof SetConnectionFeaturesCommand) {
            clients.values().forEach(c -> c.sendCommand(command));
            return;
        }
        BitfinexWebsocketClient client = clients.get(0);
        if (command instanceof SubscribeCommand) {
            BitfinexStreamSymbol symbol = ((SubscribeCommand) command).getSymbol();
            synchronized (this) {
                client = clients.values().stream()
                        .filter(c -> {
                            int subscribed = c.getSubscribedChannels().size();
                            int pending = pendingSubscriptions.get(c).size();
                            return subscribed + pending < maxChannelsPerClient;
                        })
                        .findFirst().orElseGet(this::createAndConnectClient);
                pendingSubscriptions.get(client).add(symbol);
            }
        }
        if (command instanceof UnsubscribeChannelCommand) {
            UnsubscribeChannelCommand unsubscribeCommand = (UnsubscribeChannelCommand) command;
            BitfinexStreamSymbol symbol = unsubscribeCommand.getSymbol();
            client = clients.values().stream()
                    .filter(c -> c.getSubscribedChannels().contains(symbol))
                    .findFirst().orElseThrow(IllegalStateException::new);
        }
        
        if(client == null) {
            throw new IllegalArgumentException("Client is null, please init first");
        }
        
        client.sendCommand(command);
    }

    public int websocketConnCount() {
        return numberOfClients.get();
    }

    @Override
    public boolean reconnect() {
        boolean retVal = false;
        try {
            callbackRegistry.acceptConnectionStateChange(BitfinexConnectionStateEnum.RECONNECTION_INIT);
            for (BitfinexWebsocketClient client : clients.values()) {
                retVal |= client.reconnect();
            }
            if (retVal) {
                callbackRegistry.acceptConnectionStateChange(BitfinexConnectionStateEnum.RECONNECTION_SUCCESS);
            } else {
                callbackRegistry.acceptConnectionStateChange(BitfinexConnectionStateEnum.RECONNECTION_FAILED);
            }
        } catch (final Exception ex) {
            callbackRegistry.acceptConnectionStateChange(BitfinexConnectionStateEnum.RECONNECTION_FAILED);
        }
        return retVal;
    }

    @Override
    public boolean unsubscribeAllChannels() {
        boolean retVal = true;
        for (BitfinexWebsocketClient client : clients.values()) {
            retVal &= client.unsubscribeAllChannels();
        }
        return retVal;
    }

    @Override
    public boolean isAuthenticated() {
        boolean retVal = false;
        for (BitfinexWebsocketClient client : clients.values()) {
            retVal |= client.isAuthenticated();
        }
        return retVal;
    }

    @Override
    public BitfinexApiKeyPermissions getApiKeyPermissions() {
        return clients.get(0).getApiKeyPermissions();
    }

    @Override
    public Collection<BitfinexStreamSymbol> getSubscribedChannels() {
        return clients.values().stream()
                .flatMap(c -> c.getSubscribedChannels().stream())
                .collect(Collectors.toList());
    }

    @Override
    public BitfinexWebsocketConfiguration getConfiguration() {
        return configuration;
    }

    @Override
    public BitfinexApiCallbackListeners getCallbacks() {
        return callbackRegistry;
    }

    @Override
    public QuoteManager getQuoteManager() {
        return quoteManager;
    }

    @Override
    public OrderbookManager getOrderbookManager() {
        return orderbookManager;
    }

    @Override
    public RawOrderbookManager getRawOrderbookManager() {
        return rawOrderbookManager;
    }

    @Override
    public PositionManager getPositionManager() {
        return positionManager;
    }

    @Override
    public OrderManager getOrderManager() {
        return orderManager;
    }

    @Override
    public TradeManager getTradeManager() {
        return tradeManager;
    }

    @Override
    public WalletManager getWalletManager() {
        return walletManager;
    }

    @Override
    public ConnectionFeatureManager getConnectionFeatureManager() {
        return connectionFeatureManager;
    }

    private BitfinexWebsocketClient createAndConnectClient() {
        BitfinexWebsocketConfiguration config = new BitfinexWebsocketConfiguration(configuration);
        config.setAuthenticationEnabled(false);
        config.setManagersActive(false);
        SimpleBitfinexApiBroker client = new SimpleBitfinexApiBroker(config, callbackRegistry, sequenceNumberAuditor, true);
        clients.put(numberOfClients.getAndIncrement(), client);
        if (connectEventManager.getNumberOfEventsInTimeslot() > 1) {
            try {
                connectEventManager.waitForNewTimeslot();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        connectEventManager.recordNewEvent();
        pendingSubscriptions.put(client, ConcurrentHashMap.newKeySet());
        client.connect();
        return client;
    }
}