Geek-Research-Lab/MeowJS

View on GitHub
NotAdded/video/p2p.js

Summary

Maintainability
A
2 hrs
Test Coverage
var p2p;
p2p(function() {
    var peer = ['peer.js'];
    var hpeer = ['httpPeer.js']; 
    var wsPeer = ['wsPeer.js']; 
    //var sys = ['sys.js'];
    var ig = this;
    var x = ['xx.js'];
    function sum(list) {
        return x.reduce(list, function (memo, num) {
            return memo + num;
        }, 0);
    }
    function now() {
        return (new Date()).getTime();
    }
    function client() {
        ig.blockPerConnect = 1;
        ig.connectLimit = 20;
        ig.chkPendingInterval = 10 * 1000; // => 10s
        ig.init();
    }
    client.prototype = {
        init: function() {
            /*
            ig.peerID = null;
            ig.meta = null;
            ig.file = null;
            ig.ws = null;
            ig.peers = {};
            ig.ready = false;
            ig.inUsePeer = {};
            ig.badPeer = {};
            ig.blockedPeer = {};
            ig.pieceQueue = [];
            ig.finishedPiece = [];
            ig.finishedBlock = [];
            ig.pendingBlock = {};
            ig.blockChunks = {};
            ig.sent = 0;
            ig.received = 0;
            ig.peerTrans = {};
            */
            var location, WebSocket;
            ig.lastSpeedReport = now();
            var speedReportInterval;
            speedReportInterval = setInterval(x.bind(ig.speedReport, ig), 1000);
            ig.ws = new WebSocket((location.protocol === 'https:' ? 'wss://': 'ws://')+location.host+'/link/ws');
            //ig.ws.onOpen = x.bind(ig.onWsOpen, ig);
            //ig.ws.onMessage = x.bind(ig.onWsMessage, ig);
        },
        /*
        newLink: function(meta) {
            ig.ws.send(JSON.stringify({
                cmd: 'newLink',
                meta: meta
            }));
        },
        enterLink: function(linkID) {
            ig.ws.send(JSON.stringify({
                cmd: 'enterLink',
                linkID: linkID
            }));
        },
        httpPeerAdd: function(url) {
            ig.ws.send(JSON.stringify({
                cmd: 'httpPeerAdd',
                url: url,
                bitmap: client.finishedPiece.join('')
            }));
        },
        updatePeerList: function() {
            ig.ws.send(JSON.stringify({
                cmd: 'getPeerList'
            }));
        },
        updateBitmap: function() {
            ig.ws.send(JSON.stringify({
                cmd: 'updateBitmap',
                bitmap: client.finishedPiece.join('')
            }));
        },
        strength: function() {
            if(!ig.meta) {
                return 0;
            }
            var m;
            var temp = [];
            for(m = 0; m < ig.meta.pieceCount; m++) {
                temp.push(0);
            }
            x.each(ig.peerList, function (value) {
                for(m = 0; m < ig.meta.pieceCount; m++) {
                    temp[m] += (value.bitmap[m] === '1' ? 1 : 0);
                }
            });
            var min = x.min(temp);
            return min+(x.filter(temp, function (num) {
                return num > min;
            }).length / temp.length);
        },

        // exporting
        onReady: function() {
            console.log('onReady');
        },
        onMeta: function(meta) {
            console.log('onMeta', meta);
        },
        onPeerList: function(peerList) {
            console.log('onPeerList', peerList);
        },
        onPeerConnect: function(peer) {
            console.log('onNewPeer', peer);
        },
        onPeerDisconnect: function(peer) {
            console.log('onNewPeer', peer);
        },
        onPiece: function(piece) {
            console.log('onPiece', piece);
        },
        onFinished: function() {
            console.log('onFinished');
        },
        onSpeedReport: function(report) {
            console.log('onSpeedReport', report);
        },
        */
        ensureConnection: function(peerID, connect) {
            if(ig.peers[peerID]) {
                return ig.peers[peerID];
            } else {
                var p;
                if(peerID.indexOf('http:') === 0 || peerID.indexOf('https:') === 0) {
                    p = new hpeer.peer(peerID, ig);
                } else if(peerID.indexOf('ws:') === 0 || peerID.indexOf('wss:') === 0) {
                    p = new wsPeer.peer(peerID, ig);
                } else {
                    p = new peer.peer(ig.ws, ig.peerID, peerID);
                }
                p.peerTransID = x.uniqueID('peer');

                ig.inUsePeer[peerID] = 0;
                ig.peers[peerID] = p;

                p.onMessage = x.bind(function (data) {
                    if(x.isObject(data) || data.indexOf('{') === 0) {
                        var msg = x.isObject(data) ? data : JSON.parse(data);
                        if(msg.cmd === 'requestBlock') {
                            ig.sendBlock(p, msg.piece, msg.block);
                        } else if(msg.cmd === 'block') {
                            ig.receiveBlock(p, msg.piece, msg.block, msg.data);
                        }
                    } else {
                        var pieceBlock = data.slice(0, data.indexOf('|')).split(',');
                        data = data.slice(data.indexOf('|')+1);
                        ig.receiveBlock(p, parseInt(pieceBlock[0], 10), parseInt(pieceBlock[1], 10), data);
                    }
                }, ig);
                p.onClose = x.bind(function() {
                    console.log('peer connect with '+peerID+' disconnected;');
                    ig.removePending(peerID);
                    delete ig.peers[peerID];
                    if(x.isFunction(ig.onPeerDisconnect)) {
                        ig.onPeerDisconnect(p);
                    }
                }, ig);
                if(connect) {
                    p.connect();
                }
                if(x.isFunction(ig.onPeerConnect)) {
                    console.log('new connect to '+peerID);
                    ig.onPeerConnect(p);
                }
                return p;
            }
        },
        sendBlock: function(peer, piece, block) {
            if(!ig.file) {
                return;
            } if(ig.finishedPiece[piece] !== 1) {
                return;
            }
            var start = ig.meta.pieceSize * piece + ig.meta.blockSize * block;
            ig.file.readAsBinaryString(start, start+ig.meta.blockSize, function (data) {
                peer.send(''+piece+','+block+'|'+data);
            });
        },
        requestBlock: function(peer, piece, block) {
            ig.inUsePeer[peer.id] += 1;
            ig.pendingBlock[piece][block] = peer.id;
            peer.send({
                cmd: 'requestBlock',
                piece: piece,
                block: block
            });
        },
        speedReport: function() {
            x.map(x.value(ig.peers), function (peer) {
                ig.peerTrans[peer.peerTransID] = {
                    sent: peer.sent(),
                    received: peer.received()
                };
            });
            var sent = sum(x.pluck(x.value(ig.peerTrans), 'sent')) || 0;
            var received = sum(x.pluck(x.value(ig.peerTrans), 'received')) || 0;
            if(x.isFunction(ig.onSpeedReport)) {
                var elapsed = (now() - ig.lastSpeedReport) / 1000;
                ig.onSpeedReport({
                    send: (sent - ig.sent) / elapsed,
                    sent: sent,
                    receive: (received - ig.received) / elapsed,
                    received: received
                });
            }
            ig.sent = sent;
            ig.received = received;
            ig.lastSpeedReport = now();
        },
        pickupBlock: function() {
            if(x.isEmpty(ig.pieceQueue)) {
                return null;
            }
            var m, m2;
            var blockCount = Math.ceil(1.0 * ig.meta.pieceSize / ig.meta.blockSize);
            for(m = 0; m < ig.pieceQueue.length; m++) {
                var piece = ig.pieceQueue[m];
                // initiating if it's a new piece
                if(ig.blockChunks[piece] === undefined) {
                    ig.blockChunks[piece] = [];
                    ig.finishedBlock[piece] = [];
                    ig.pendingBlock[piece] = [];
                    for(m = 0; m < blockCount; ++m) {
                        ig.finishedBlock[piece][m] = 0;
                        ig.pendingBlock[piece][m] = 0;
                    }
                }
                for(m2 = 0; m2 < blockCount; ++m2) {
                    if(ig.finishedBlock[piece][m2] || ig.pendingBlock[piece][m2]) {
                        continue;
                    }
                    return [piece, m2];
                }
            }
            return null;
        },
        findAvailablePeer: function(piece) {
            var peers = [];
            for(var key in ig.peerList) {
                if(key === ig.peerID) {
                    continue;
                } if(ig.peerList[key].bitmap[piece] && (!x.has(ig.inUsePeer, key) || ig.inUsePeer[key] < ig.blockPerConnect) && !x.blockedPeer[key]) {
                    peers.push(key);
                }
            }
            if(peers.length === 0) {
                return null;
            } else if(peers.length === 1) {
                return peers[0];
            }
            var peersScore = x.map(peers, function (key) {
                return (ig.badPeer[key] || 0) * 1000 + (ig.peers[key] ? 0 : 1) * 100 + (ig.inUsePeer[key] || 0) * 10;
            });
            var temp = [];
            var minScore = x.min(peersScore);
            for(var m = 0; m < peers.length; m++) {
                if(peersScore[m] === minScore) {
                    temp.push(peers[m]);
                }
            }
            return temp[x.random(temp.length-1)];
        },
        startProcess: function() {
            // x.throttle(function() {...}, 100),
            while(x.size(ig.inUsePeer) < ig.connectLimit && ig.startProgress())
            {}
        },
        startProgress: function(block) {
            // pickup block
            var pieceBlock = ig.pickupBlock();
            if(pieceBlock === null) {
                return false;
            }
            var piece = pieceBlock[0];
            block = pieceBlock[1];
            // finding available peer
            var bestPeer = ig.findAvailablePeer(piece);
            if(bestPeer === null) {
                return false;
            }
            var peer = ig.ensureConnection(bestPeer, true);
            // marking
            ig.requestBlock(peer, piece, block);
            // Setting timeout for block
            //abandon all pending block when one is timeout
            x.delay(x.bind(ig.chkPending, ig, bestPeer, piece, block, peer.received(), ig.received, now()), ig.chkPendingInterval * 2);
            return true;
        },
        chkPending: function(peerID, piece, block, lastReceived, totalReceived, lastTime) {
            if(ig.pendingBlock[piece] && ig.pendingBlock[piece][block] === peerID && ig.peers[peerID]) {
                var received = ig.peers[peerID].received();
                var speed = (received - lastReceived) / (now() - lastTime) * 1000;
                var globalSpeed = (ig.received - totalReceived) / (now() - lastTime) * 1000;
                if(speed > globalSpeed / x.size(ig.peers) / 4) {
                    // quarter of the average speed
                    x.delay(x.bind(ig.chkPending, ig, peerID, piece, block, received, ig.received, now()), ig.chkPendingInterval);
                } else {
                    // timeout
                    console.log('low download speed from '+peerID+'...', speed, globalSpeed);
                    ig.badPeer[peerID] = ig.badPeer[peerID] || 0;
                    ig.badPeer[peerID] += 1;
                    // closing and blocking the peer for one block time
                    ig.peers[peerID].close();
                    ig.blockedPeer[peerID] = 998;
                    x.delay(x.bind(function() {
                        delete ig.blockedPeer[peerID];
                        delete ig.inUsePeer[peerID];
                        x.defer(x.bind(ig.startProcess, ig));
                    }, ig), (ig.meta.blockSize / speed > 120 ? 120: ig.meta.blockSize / speed) * 1000);
                }
            }
        },
        removePending: function(peerID) {
            for(var p in ig.pendingBlock) {
                for(var b = 0; b < ig.pendingBlock[p].length; b++) {
                    if(ig.pendingBlock[p][b] === peerID) {
                        ig.pendingBlock[p][b] = 0;
                    }
                }
            }
        },
        /*
        onBlockFinished: function(piece) {
            // finished piece
            if(x.all(ig.finishedBlock[piece])) {
                var Blob;
                var blob = new Blob(ig.blockChunks[piece]);
                ig.file.write(blob, ig.meta.pieceSize * piece, function() {
                    if(x.isFunction(ig.onPiece)) {
                        ig.onPiece(piece);
                    }
                    x.defer(x.bind(ig.updateBitmap, ig));
                    // check finished
                    if(x.all(ig.finishedPiece) && x.isEmpty(ig.pieceQueue) && x.isFunction(ig.onFinished)) {
                        ig.finishOnce = ig.finishOnce || x.once(ig.onFinished);
                        ig.finishOnce();
                    }
                });
                ig.finishedPiece[piece] = 1;
                if(ig.pieceQueue.indexOf(piece) !== -1) {
                    ig.pieceQueue.splice(ig.pieceQueue.indexOf(piece), 1);
                }
                delete ig.blockChunks[piece];
                delete ig.finishedBlock[piece];
                delete ig.pendingBlock[piece];
            }
            x.defer(x.bind(ig.startProcess, ig));
        },
        */
        /*
        onWsOpen: function() {},
        onWsMessage: function(xEvent) {
            var msg = JSON.parse(xEvent.data);
            if(!msg.cmd && msg.type && msg.origin) {
                var peer = ig.ensureConnection(msg.origin, false);
                peer.onWsMessage(msg);
            } else if(msg.cmd) {
                console.debug('p2p:', msg);
                switch (msg.cmd) {
                    case 'peerID':
                    ig.peerID = msg.peerID;
                    ig.ready = true;
                    if(x.isFunction(ig.onReady)) {
                        ig.onReady();
                    }
                    break;
                    case 'meta':
                    if(ig.meta !== null) {
                        break;
                    }
                    ig.meta = msg.meta;
                    for(var m = 0; m < ig.meta.pieceCount; ++m) {
                        ig.finishedPiece[m] = 0;
                        ig.pieceQueue.push(m);
                    }
                    ig.file = new sys.file(ig.meta.size, function() {
                        if(x.isFunction(ig.onMeta)) {
                            ig.onMeta(ig.meta);
                        }
                    });
                    break;
                    case 'peerList':
                    ig.peerList = msg.peerList;
                    if(x.isFunction(ig.onPeerList)) {
                        ig.onPeerList(ig.peerList);
                    }
                    break;
                    default:
                    break;
                }
            }
        } */
    };
    return {
        client: client
    };
});