packages/data-collector/src/data-collector.js
import {EconomicEntityFactory, CollectionOperationType} from '@thinkdeep/model';
import {validString} from '@thinkdeep/util';
import {Client} from './client.js';
import {Command, Option} from 'commander';
import {DataScraper} from './data-scraper.js';
import {Kafka} from 'kafkajs';
import log4js from 'log4js';
import moment from 'moment';
import sinon from 'sinon';
import {TwitterApi} from 'twitter-api-v2';
const logger = log4js.getLogger();
logger.level = 'debug';
try {
const program = new Command();
program
.name('collect-data')
.description('Collect data using the specified operation.');
program.addOption(
new Option(
'-e, --economic-entity <economic entity>',
`Specify the economic entity (i.e, '{ "name": "Google", "type": "BUSINESS"}').`
)
);
program.addOption(
new Option(
'-o, --operation-type <operation type>',
'Specify the type of data collection operation you would like to execute.'
).choices(CollectionOperationType.types)
);
program.addOption(
new Option(
'-l, --limit [limit]',
'Specify the limit associated with the operation.'
).default(10, 'Defaults to 10.')
);
program.addOption(
new Option(
'-m, --mock-data <mock data>',
'Trigger mocking of the cli.'
).default({}, 'An empty object')
);
program.parse(process.argv);
const options = program.opts();
const economicEntity = EconomicEntityFactory.get(
JSON.parse(options.economicEntity)
);
if (!validString(options.operationType))
throw new Error('Operation type is required');
const currentUtcDateTime = moment().utc().format();
let kafkaClient;
let twitterClient;
if (!options.mockData || Object.keys(options.mockData).length <= 0) {
logger.info(`Creating kafka client.`);
kafkaClient = new Kafka({
clientId: 'collect-data',
brokers: [
`${process.env.PREDECOS_KAFKA_HOST}:${process.env.PREDECOS_KAFKA_PORT}`,
],
});
twitterClient = new TwitterApi(process.env.PREDECOS_TWITTER_BEARER)
.readOnly;
} else {
logger.info(`Creating mock kafka client.`);
kafkaClient = {
admin: sinon.stub().returns({
connect: sinon.stub(),
createTopics: sinon.stub(),
disconnect: sinon.stub(),
}),
producer: sinon.stub().returns({
connect: sinon.stub(),
send: sinon.stub(),
disconnect: sinon.stub(),
}),
};
twitterClient = {
v2: {
get: sinon.stub().returns({
data: JSON.parse(options.mockData),
}),
},
};
}
const collectDataClient = new Client(twitterClient, kafkaClient, logger);
switch (options.operationType) {
case CollectionOperationType.FetchTweets: {
logger.info('Fetching tweets.');
(async () => {
logger.info('Connecting to data collection client.');
await collectDataClient.connect();
const recentTweets = await collectDataClient.fetchRecentTweets({
query: `${options.entityName} lang:en -is:retweet`,
max_results: options.limit,
});
logger.debug(
`Retrieved the following tweets: ${JSON.stringify(recentTweets)}`
);
const data = {
utcDateTime: currentUtcDateTime,
economicEntity,
tweets: recentTweets,
};
await collectDataClient.emitEvent('TWEETS_FETCHED', data);
process.exit(0);
})();
break;
}
case CollectionOperationType.ScrapeData: {
(async () => {
logger.info('Connecting to data collection client.');
await collectDataClient.connect();
logger.info(
`Scraping data for ${economicEntity.type} ${economicEntity.name}.`
);
const scraper =
!options.mockData || Object.keys(options.mockData).length <= 0
? new DataScraper(logger)
: sinon.createStubInstance(DataScraper);
const scrapedData = await scraper.scrapeData(economicEntity);
const data = {
utcDateTime: currentUtcDateTime,
economicEntity,
data: scrapedData,
};
await collectDataClient.emitEvent('DATA_SCRAPED', data);
})();
break;
}
default: {
throw new Error(
`The specified operation ${options.operationType} isn't yet supported.`
);
}
}
} catch (e) {
logger.error(e.message.toString());
throw e;
}