atroo/node-weedfs

View on GitHub
lib/weedfs.js

Summary

Maintainability
F
4 days
Test Coverage
"use strict";

var qs = require('querystring');
var fs = require('fs');
var path = require('path');
var FormData = require('form-data');
var http = require("http");
var url = require("url");
var SeaweedFSError = require("./error");

function WeedFSClient(opts) {
    this.usePublicUrl = opts.usePublicUrl || false;
    this.clientOpts = opts || {};

    this.baseURL = "http://" + this.clientOpts.server + ":" + this.clientOpts.port + "/";
}

WeedFSClient.prototype = {
    _assign: function (opts) {
        var self = this;
        return new Promise(function (resolve, reject) {
            var req = http.request(url.parse(self.baseURL + "dir/assign?" + qs.stringify(opts)), function (res) {
                let body = "";

                res.setEncoding('utf8');
                res.on("data", function (chunk) {
                    body += chunk;
                });
                res.on("end", function () {
                    var json = JSON.parse(body);
                    return resolve(json);
                });
            });
            req.on("error", function (err) {
                return reject(err);
            });
            req.end();
        });
    },

    write: function (file, opts) {
        opts = opts || {};
        var self = this;
        if (file instanceof Array) {
            opts.count = file.length;

            for (var i = 0; i < opts.count; i++) {
                if (typeof file[i] === "string") {
                    file[i] = path.resolve(process.cwd(), file[i]);
                }
            }
        } else {
            opts.count = 1
            if (typeof file === "string") {
                file = path.resolve(process.cwd(), file);
            }
            file = [file];
        }

        //we do not pass the headers to assign, since everything in options
        //is serialized to a querystring
        let assignOpts = Object.assign({}, opts);
        delete assignOpts.headers;
        return self._assign(assignOpts).then(function (finfo) {
            if (finfo.error) {
                return Promise.reject(finfo.error);
            }

            var proms = [];
            for (var i = 0; i < opts.count; i++) {
                proms.push(new Promise(function (resolve, reject) {
                    var form = new FormData();
                    var stream = typeof file[i] === "string" ? fs.createReadStream(file[i]) : null;
                    form.append("file", stream ? stream : file[i]);

                    var urlParts = url.parse("http://" + (self.usePublicUrl ? finfo.publicUrl : finfo.url) + "/" + finfo.fid + (opts.count == 1 ? "" : "_" + i));
                    var options = Object.assign({}, urlParts);
                    if (opts.headers) {
                        options.headers = opts.headers;
                    }

                    var req = form.submit(options, function (err, res) {
                        if (err) {
                            return reject(err);
                        }
                        resolve(res);
                    });

                    //we only check for self created streams, stream errors from outside streams should be handled outside
                    if (stream) {
                        stream.on("error", function (err) {
                            reject(err);
                        });
                    }

                    req.on("error", function (err) {
                        reject(err);
                    });

                    req.on("socket", function (socket) {
                        socket.on("error", function (err) {
                            reject(err);
                        });
                    })
                }));
            }

            return Promise.all(proms).then(function () {
                return Promise.resolve(finfo);
            });

        });
    },

    find: function (fid, opts) {
        let self = this;
        return new Promise(function (resolve, reject) {
            let options = Object.assign({}, url.parse(self.baseURL + "dir/lookup?volumeId=" + fid));
            if (opts && opts.collection) {
                options.path += `&collection=${opts.collection}`
            }
            let req = http.request(options, function (res) {
                let body = "";
                let err;

                res.setEncoding('utf8');
                res.on('data', (chunk) => {
                    body += chunk;
                });

                res.on("end", function () {
                    var json = JSON.parse(body);
                    if (json.error) {
                        var err = new SeaweedFSError(json.error);
                        err.volumeId = json.volumeId;
                        return reject(err);
                    } else {
                        return resolve(json);
                    }
                });
            });
            req.on("error", function (err) {
                reject(err);
            });
            req.end();
        });
    },

    clusterStatus: function () {
        var self = this;
        return new Promise(function (resolve, reject) {
            var req = http.request(url.parse(self.baseURL + "cluster/status"), function (res) {
                let body = "";
                let err;

                res.setEncoding('utf8');
                res.on('data', (chunk) => {
                    body += chunk;
                });

                res.on("end", function () {
                    var json = JSON.parse(body);
                    if (json.error) {
                        var err = new SeaweedFSError(json.error);
                        err.volumeId = json.volumeId;
                        return reject(err);
                    } else {
                        return resolve(json);
                    }
                });
            });
            req.on("error", function (err) {
                reject(err);
            });
            req.end();
        });
    },

    masterStatus: function () {
        var self = this;
        return new Promise(function (resolve, reject) {
            var req = http.request(url.parse(self.baseURL + "cluster/status"), function (res) {
                let body = "";
                let err;

                res.setEncoding('utf8');
                res.on('data', (chunk) => {
                    body += chunk;
                });

                res.on("end", function () {
                    var json = JSON.parse(body);
                    if (json.error) {
                        var err = new SeaweedFSError(json.error);
                        err.volumeId = json.volumeId;
                        return reject(err);
                    } else {
                        return resolve(json);
                    }
                });
            });
            req.on("error", function (err) {
                reject(err);
            });
            req.end();
        });
    },

    systemStatus: function (cb) {
        var self = this;
        return new Promise(function (resolve, reject) {
            var req = http.request(url.parse(self.baseURL + "dir/status"), function (res) {
                let body = "";
                let err;

                res.setEncoding('utf8');
                res.on('data', (chunk) => {
                    body += chunk;
                });

                res.on("end", function () {
                    var json = JSON.parse(body);
                    if (json.error) {
                        var err = new SeaweedFSError(json.error);
                        err.volumeId = json.volumeId;
                        return reject(err);
                    } else {
                        return resolve(json);
                    }
                });
            });
            req.on("error", function (err) {
                reject(err);
            });
            req.end();
        });
    },

    volumeStatus: function (host) {
        return new Promise(function (resolve, reject) {
            var req = http.request(url.parse("http://" + host + "/status"), function (res) {
                let body = "";
                let err;

                res.setEncoding('utf8');
                res.on('data', (chunk) => {
                    body += chunk;
                });

                res.on("end", function () {
                    var json = JSON.parse(body);
                    if (json.error) {
                        var err = new SeaweedFSError(json.error);
                        err.volumeId = json.volumeId;
                        return reject(err);
                    } else {
                        return resolve(json);
                    }
                });
            });
            req.on("error", function (err) {
                reject(err);
            });
            req.end();
        });
    },

    read: function (fid, stream, opts) {
        var self = this;

        return self.find(fid, opts).then(function (res) {
            return new Promise(function (resolve, reject) {
                if (res.locations.length) {
                    let options = Object.assign({}, url.parse("http://" + (self.usePublicUrl ? res.locations[0].publicUrl : res.locations[0].url) + "/" + fid));
                    if (opts && opts.headers) {
                        options.headers = opts.headers;
                    }
                    let req = http.request(options, function (res) {
                        if (res.statusCode === 404) {
                            var err = new SeaweedFSError("file '" + fid + "' not found");
                            if (stream) {
                                stream.emit("error", err);
                            }
                            return reject(err);
                        }
                        if (stream) {
                            //support for http write streams
                            if (typeof stream.writeHead === 'function') {
                                stream.writeHead(res.statusCode, res.headers)
                            }
                            res.pipe(stream);
                            resolve(stream);
                        } else {
                            var tmp = [];
                            res.on("data", function (chunk) {
                                tmp.push(chunk);
                            });
                            res.on("end", function () {
                                var buffer = Buffer.concat(tmp);
                                resolve(buffer);
                            });
                        }
                    });
                    req.on("error", function (err) {
                        if (stream) {
                            stream.emit("error", err);
                        }
                        reject(err);
                    });
                    req.end();
                } else {
                    var err = new SeaweedFSError("No volume servers found for volume " + fid.split(",")[0]);
                    if (stream) {
                        stream.emit("error", err);
                    }
                    reject(err);

                }
            });
        });

    },

    remove: function (fid, opts) {
        var self = this;

        return self.find(fid, opts).then(function (result) {
            return new Promise(function (resolve, reject) {
                var proms = [];
                for (var i = 0, len = result.locations.length; i < len; i++) {
                    proms.push(new Promise(function (resolve, reject) {
                        var req = http.request(Object.assign(url.parse("http://" + (self.usePublicUrl ? result.locations[i].publicUrl : result.locations[i].url) + "/" + fid), {
                            "method": "DELETE"
                        }), function (res) {
                            if (res.statusCode === 404) {
                                var err = new SeaweedFSError("file '" + fid + "' not found");
                                return reject(err);
                            }
                            var tmp = [];
                            res.on("data", function (chunk) {
                                tmp.push(chunk);
                            });
                            res.on("end", function () {
                                var buffer = Buffer.concat(tmp);
                                var payload = JSON.parse(buffer.toString("utf-8"));

                                if (!payload.size) {
                                    return reject(new SeaweedFSError("File with fid " + fid + " could not be removed"));
                                }
                                resolve(payload);
                            });
                        });
                        req.on("error", function (err) {
                            reject(err);
                        });
                        req.end();
                    }));
                }
                Promise.all(proms).then(function () {
                    resolve({
                        count: result.locations.length
                    });
                }).catch(function (err) {
                    reject(err);
                });
            });
        });
    },

    vacuum: function (opts) {
        var self = this;
        opts = opts || {};
        return new Promise(function (resolve, reject) {
            var req = http.request(url.parse(self.baseURL + "vol/vacuum?" + qs.stringify(opts)), function (res) {
                var tmp = [];
                res.on("data", function (chunk) {
                    tmp.push(chunk);
                });
                res.on("end", function () {
                    var buffer = Buffer.concat(tmp);
                    resolve(JSON.parse(buffer.toString("utf8")));
                });
            });
            req.on("error", function (err) {
                reject(err);
            });
            req.end();
        });
    }
};

module.exports = WeedFSClient;