lib/text/frequency.js
/*
* Frequency Collection
* @author exabugs@gmail.com
*/
var _ = require('underscore')
, fs = require('fs')
, mongodb = require('mongodb')
, ObjectID = mongodb.ObjectID
, async = require('async')
, extend = require('../extend')
, Util = require('../vector/util')
, R = require('../R')
, path = require('path')
;
/**
* ユーティリティ関数
* @param field
* @returns {Util}
*/
function getUtil(field) {
var util = new Util(field);
extend.extend(util, Util.prototype); // mapReduce関数scopeパラメータでprototypeが使えないのでコピー
extend.extend(util, extend); // extendを使うのでコピー
return util;
}
exports.tfiof = function (db, target, freq, field, callback) {
// 検索条件
var condition = target.option.condition || {};
var collection = db.collection(target.collection);
condition[target.attribute] = {$exists: 1};
var default_value = target.default || 1; // freqcollに単語が存在しない場合の値をいくつにするか
var f = {};
f[target.attribute] = 1;
var cursor = collection.find(condition, {fields: f});
cursor.count(function (err, count) {
if (err || count === 0) {
callback(err);
}
var c = count;
cursor.each(function (err, item) {
if (item !== null) {
var x = extend.getValue(item, target.attribute);
var freqcoll = db.collection(freq[target.attribute]);
exports.to_tfiof(x, field, freqcoll, default_value, function (err, x) {
var obj = {};
obj[target.attribute] = x;
collection.findAndModify({_id: item._id}, null, {$set: obj}, function (err) {
if (err || --c === 0) {
callback(err, count);
}
});
});
}
});
});
}
exports.to_tfiof = function (tf_array, field, of_coll, default_value, callback) {
var util = new Util([field[0], field[2]]);
var words = _.pluck(tf_array, field[0]);
of_coll.find({_id: {$in: words}}).toArray(function (err, array) {
merge(tf_array, field, array, ['_id', 'value']);
if (default_value) {
_.each(tf_array, function (item) {
if (!item[field[2]]) {
item[field[2]] = item[field[1]] * default_value;
}
});
}
util.normalize(tf_array);
callback(err, tf_array);
});
}
function merge(a, af, b, bf) {
var ia = 0, ib = 0;
while (ia < a.length && ib < b.length) {
var ka = a[ia][af[0]];
var kb = b[ib][bf[0]];
if (ka == kb) {
var obja = a[ia++];
var objb = b[ib++];
obja[af[2]] = obja[af[1]] * objb[bf[1]];
} else if (ka > kb) {
ib++;
} else {
ia++;
}
}
return a;
}
/**
* 単語が出現するオブジェクトの個数を求める
* @param db
* @param info
* @param field
* @param callback
*/
exports.object_frequency = function (db, info, freq, field, callback) {
// 検索条件
var condition = info.option.condition || {};
var collection = db.collection(info.collection);
condition[info.attribute] = {$exists: 1};
// ユーティリティ関数
var util = {};
extend.extend(util, extend);
collection.find(condition, {fields: {}}).count(function (err, total) {
// パラメータ
var params = {
attribute: info.attribute,
field: field,
total: total
};
// MapReduce
collection.mapReduce(
function () {
_.getValue(this, $.attribute).forEach(function (item) {
emit(item[$.field[0]], 1);
});
},
function (key, values) {
return Array.sum(values);
},
{
scope: {_: util, $: params},
finalize: function (key, value) {
return Math.log($.total / value);
},
query: condition,
out: freq[info.attribute] || {inline: 1}
},
function (err, results) {
callback(err, results);
}
);
});
}
/**
* 単語が出現する回数を求める
* @param collection
* @param attribute
* @param field
* @param option
* @param callback
*/
exports.term_frequency = function (db, info, field, callback) {
// 検索条件
var condition = info.option.condition || {};
condition[info.attribute] = {$exists: 1};
// ユーティリティ関数
var util = getUtil(field);
// パラメータ
var params = {
attribute: info.attribute,
field: field
};
db.collection(info.collection).mapReduce(
function () {
_.getValue(this, $.attribute).forEach(function (item) {
emit(item[$.field[0]], item[$.field[1]]);
});
},
function (key, values) {
return Array.sum(values);
},
{
scope: {_: util, $: params},
query: condition,
out: info.option.out || {inline: 1}
},
function (err, results) {
callback(err, results);
}
);
}
/**
* 集計して上位のオブジェクトに集約する
* @param source
* @param target
* @param field
* @param callback
*/
exports.countup = function (db, target, source, field, callback) {
var condition = target.option.condition || {};
var collection = db.collection(target.collection);
var index = {};
index[[target.attribute, field[0]].join('.')] = 1;
collection.ensureIndex(index, function (err) {
var cursor = collection.find(condition, {fields: {}});
cursor.count(function (err, count) {
if (err || count == 0) {
return callback(err, count);
}
cursor.each(function (err, item) {
if (item) {
source.option.condition = source.option.condition || {};
source.option.condition[source.key] = {$in: [item._id]};
exports.term_frequency(db, source, field, function (err, results) {
var array = [];
results.forEach(function (result) {
var item = {};
item[field[0]] = result._id;
item[field[1]] = result.value;
array.push(item);
});
var update = {};
update[target.attribute] = array;
collection.findAndModify({_id: item._id}, [], {$set: update}, function (err) {
if (--count === 0) {
return callback(err);
}
});
});
}
});
});
});
}
/**
* 相互類似度
* @param db
* @param source
* @param field
* @param callback
*/
exports.mutualize = function (db, source, field, callback) {
var condition = source.option.condition || {};
var collection = db.collection(source.collection);
var fields = {};
fields[source.attribute] = 1;
var option = {
fields: fields,
sort: {_id: 1}
}
// ユーティリティ関数
var util = getUtil([field[0], field[2]]);
db.collection(source.option.out).drop();
var cursor = collection.find(condition, option);
cursor.count(function (err, count) {
if (err || count == 0) {
return callback(err, count);
}
cursor.each(function (err, item) {
if (item) {
// パラメータ
var params = {
attribute: source.attribute,
_id: item._id,
vector: extend.getValue(item, source.attribute),
field: field
};
collection.mapReduce(
function () {
var cosine = _.intersect($.vector, _.getValue(this, $.attribute));
emit({A: $._id, B: this._id}, cosine);
},
function (key, values) {
return values[0];
},
{
scope: {_: util, $: params},
query: {_id: {$gt: item._id}},
out: source.option.out ? {merge: source.option.out} : {inline: 1}
},
function (err, results) {
if (--count === 0) {
return callback(err, results);
}
}
);
}
});
});
}
/**
* 主座標分析
* →「R」が実行可能な場合、類似検索結果に対して主座標分析を行うことができる。
* @param db
* @param target
* @param source
* @param callback
var target = {
collection: 'mails.search.result',
attribute: 'tf',
option: {
field: ['x, 'y']
}
};
var source = {
collection: 'mails.mutual',
option: {
}
};
*/
exports.cmdscale = function (db, target, source, callback) {
var field = target.option.field || ['x', 'y'];
var target_collection = db.collection(target.collection);
var score = target.attribute + '.score';
var target_sort = {};
target_sort[score] = -1; // スコア降順
var target_condition = target.option.condition || {};
var target_option = {fields: {'_id': 1}, sort: target_sort, limit: 200};
target_collection.find(target_condition, target_option).toArray(function (err, results) {
if (err) {
callback(err);
} else {
// 類似度 読込
var source_collection = db.collection(source.collection);
var ids = _.pluck(results, '_id');
var source_condition = source.option.condition || {};
source_condition['_id.A'] = {$in: ids};
source_condition['_id.B'] = {$in: ids};
source_collection.find(source_condition).toArray(function (err, results) {
if (err) {
callback(err);
} else {
var map = _.reduce(ids, function (memo, item) {
memo[item] = {};
return memo;
}, {});
map = _.reduce(results, function (memo, item) {
var _id = item._id;
memo[_id.A][_id.B] = item.value;
return memo;
}, map);
// R 入力データ作成
var data = [];
data.push('\t' + ids.join('\t'));
_.each(ids, function (id_A) {
var line = [];
line.push(id_A);
_.each(ids, function (id_B) {
var value = (id_A === id_B) ? 1 : map[id_A][id_B];
line.push(value === undefined ? '' : (1 - value));
});
data.push(line.join('\t'));
});
data.push('');
// R 入力データ書込
var tmpfilename = (new ObjectID()).toString();
var tmpdir = '/tmp';
var ifile = path.join(tmpdir, tmpfilename + '.in');
var ofile = path.join(tmpdir, tmpfilename + '.out');
fs.writeFile(ifile, data.join('\n'), function (err) {
if (err) {
callback(err);
} else {
// R 実行
var script = path.resolve(path.join('lib', 'R', 'scripts', 'sammon.R'));
R.exec(script, [ifile, ofile], function (err, stdout, stderr) {
fs.unlink(ifile);
if (!path.existsSync(ofile)) {
console.log(stderr);
callback(err);
} else {
// R 結果読込
fs.readFile(ofile, function (err, text) {
if (err) {
callback(err);
} else {
fs.unlink(ofile);
var index = 0;
async.each(text.toString().split('\n'), function (element, next) {
var _id = ids[index++];
var value = element.split(' ');
// R 結果書込
var doc = {};
doc[target.attribute + '.' + field[0]] = parseFloat(value[0]);
doc[target.attribute + '.' + field[1]] = parseFloat(value[1]);
target_collection.findAndModify({_id: _id}, null, {$set: doc}, function (err, result) {
next();
});
}, function (err) {
callback(err);
});
}
});
}
});
}
})
}
});
}
});
}