phplib/ESClient.php
<?php
namespace FOO;
/**
* Class ESClient
* Contains functionality for managing the Alert index.
* @package FOO
*/
class ESClient {
/** Number of Alerts to batch into a single request. */
const BATCH_SIZE = 5000;
/** Name of the ES mapping template. */
const MAPPING_TEMPLATE = '411_alerts_wildcard';
/** @var Search[] Mapping of ids to Search objects. **/
private $searches = [];
/** @var Alert[] List of pending Alerts. **/
private $list = [];
/** @var string Index name. **/
private $index;
/** @var \Elasticsearch\Client Client object. **/
private $client;
/**
* @param boolean $init Whether to initialize the ES index (if it doesn't exist).
*/
public function __construct($init=true) {
$this->index = self::getIndexName();
$this->client = self::getClient('alerts', true);
if($init) {
$this->initializeIndex();
}
}
/**
* Returns the name of the Alerts index.
* @return string Index name.
*/
public static function getIndexName() {
return '411_alerts_' . SiteFinder::getCurrentId();
}
/**
* Get an ES client.
* @param string $config_name The name of the es config key.
* @param bool $index Whether this client will be used for indexing.
* @return \Elasticsearch\Client The client object.
*/
public static function getClient($config_name='alerts', $index=false) {
$escfg = Config::get('elasticsearch')[$config_name];
$cb = \Elasticsearch\ClientBuilder::create();
if($index && count($escfg['index_hosts']) > 0) {
$cb->setHosts($escfg['index_hosts']);
} else if(count($escfg['hosts']) > 0) {
$cb->setHosts($escfg['hosts']);
}
if(!is_null($escfg['ssl_cert'])) {
$cb->setSSLVerification($escfg['ssl_cert']);
}
if(!is_null($escfg['ssl_client_key'])) {
$cb->setSSLKey($escfg['ssl_client_key']);
}
if(!is_null($escfg['ssl_client_cert'])) {
$cb->setSSLCert($escfg['ssl_client_cert']);
}
return $cb->build();
}
/**
* Initialize the index as necessary.
*/
public function initializeIndex() {
// Create template.
if(!$this->client->indices()->existsTemplate(['name' => self::MAPPING_TEMPLATE])) {
$version = explode('.', $this->client->info()['version']['number'])[0];
$string_type = 'string';
if ($version >= 6) {
$string_type = 'text';
}
$this->client->indices()->putTemplate([
'name' => self::MAPPING_TEMPLATE,
'body' => [
'template' => '411_alerts_*',
'mappings' => [
'_default_' => [
'properties' => [
'alert_date' => ['type' => 'date', 'format' => 'epoch_second'],
'assignee_type' => ['type' => 'long'],
'assignee' => ['type' => 'long'],
'content' => ['type' => 'object'],
'source' => ['type' => $string_type],
'source_id' => ['type' => $string_type],
'search_id' => ['type' => 'long'],
'state' => ['type' => 'long'],
'resolution' => ['type' => 'long'],
'escalated' => ['type' => 'boolean'],
'content_hash' => ['type' => $string_type],
'notes' => ['type' => $string_type],
'tags' => ['type' => $string_type],
'priority' => ['type' => 'long'],
'category' => ['type' => $string_type],
'owner' => ['type' => 'long'],
'create_date' => ['type' => 'date', 'format' => 'epoch_second'],
'update_date' => ['type' => 'date', 'format' => 'epoch_second'],
]
]
]
]
]);
}
// Create index.
if(!$this->client->indices()->exists(['index' => self::getIndexName()])) {
$this->client->indices()->create(['index' => self::getIndexName()]);
}
}
public function destroyIndex() {
if($this->client->indices()->existsTemplate(['name' => self::MAPPING_TEMPLATE])) {
$this->client->indices()->deleteTemplate(['name' => self::MAPPING_TEMPLATE]);
}
if($this->client->indices()->exists(['index' => self::getIndexName()])) {
$this->client->indices()->delete(['index' => self::getIndexName()]);
}
}
/**
* Search for Alerts in the index.
* @param string $query Query string query.
* @param int $from The lower time threshold.
* @param int $to The upper time threshold.
* @param int $offset The offset from the beginning of the result set.
* @param int $count The number of results to return.
* @return Alert[] An array of Alerts.
*/
public function getAlerts($query, $from, $to, $offset, $count) {
$result_set = $this->query($query, null, $from, $to, false, $offset, $count);
$ret = [];
foreach($result_set as $result) {
foreach($result['hits']['hits'] as $entry) {
$ret[] = self::format($entry);
}
}
return $ret;
}
/**
* Get a list of Alert ids matching the query.
* @param string $query Query string query.
* @param int $from The lower time threshold.
* @param int $to The upper time threshold.
* @return int[] An array of Alert ids.
*/
public function getIds($query, $from, $to) {
$result_set = $this->query($query, ['id'], $from, $to, true);
$ret = [];
foreach($result_set as $result) {
foreach($result['hits']['hits'] as $entry) {
$ret[] = $entry['fields']['id'][0];
}
}
return $ret;
}
private function query($query, $fields=null, $from=null, $to=null, $scroll=false, $offset=null, $count=null) {
$client = self::getClient();
$filter = [];
$conds = [];
if(!is_null($from)) {
$conds['gte'] = $from;
}
if(!is_null($to)) {
$conds['lt'] = $to;
}
if(count($conds) > 0) {
$filter[] = [
'range' => [ 'alert_date' => $conds ]
];
}
$filter[] = [
'query_string' => [ 'query' => $query ]
];
$body = [
'query' => [
'bool' => [
'filter' => $filter
],
],
'sort' => [ 'alert_date' => [ 'order' => 'desc', 'unmapped_type' => 'date' ] ]
];
if(!is_null($offset)) {
$body['from'] = $offset;
}
if(!is_null($count)) {
$body['size'] = $count;
}
if(!is_null($fields)) {
$body['fields'] = $fields;
}
$result_set = [];
try {
if($scroll) {
$response = $client->search([
'index' => $this->index,
'body' => $body,
'scroll' => '15s',
]);
$result_set[] = $response;
do {
if(!array_key_exists('_scroll_id', $response)) {
throw new \RuntimeException('No scroll id');
}
$response = $client->scroll([
'scroll_id' => $response['_scroll_id'],
'scroll' => '15s'
]);
$result_set[] = $response;
} while(count($response['hits']['hits']) > 0);
$client->clearScroll(['scroll_id' => $response['_scroll_id']]);
} else {
$result_set[] = $client->search([
'index' => $this->index,
'body' => $body,
]);
}
} catch(\Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
throw new \RuntimeException('Error executing query');
}
return $result_set;
}
/**
* Search for Alerts in the index. Return results grouped by several fields.
* @param string $query Query string query.
* @param int $from The lower time threshold.
* @param int $to The upper time threshold.
* @return array A structure containing Alert information.
*/
public function bootstrap($query, $from=null, $to=null) {
$client = self::getClient();
$fields = ['escalated', 'assignee_type', 'assignee', 'search_id', 'state'];
$aggs = [];
$node = &$aggs;
foreach($fields as $field) {
$node['aggs'] = ['agg' => ['terms' => [ 'field' => $field, 'size' => 2**31 - 1 ]]];
$node = &$node['aggs']['agg'];
}
$node['aggs'] = ['hits' => [
'top_hits' => [
'size' => 10,
'sort' => [ 'alert_date' => [ 'order' => 'desc', 'unmapped_type' => 'date' ] ]
]
]];
$filter = [];
$conds = [];
if(!is_null($from)) {
$conds['gte'] = $from;
}
if(!is_null($to)) {
$conds['lt'] = $to;
}
if(count($conds) > 0) {
$filter[] = [
'range' => [ 'alert_date' => $conds ]
];
}
$filter[] = ['query_string' => ['query' => $query]];
try {
$data = $client->search([
'index' => $this->index,
'body' => [
'query' => [
'bool' => [
'filter' => $filter
],
],
'size' => 0,
'aggs' => $aggs['aggs'],
]
]);
} catch(\Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
throw new \RuntimeException('Error executing query');
}
return $this->bootstrapRecurse([], $data['aggregations'], 0, array_reverse($fields));
}
private function bootstrapRecurse($node, $data, $count, $fields) {
$ret = [];
if(array_key_exists('agg', $data)) {
$key = array_pop($fields);
// Iterate over each bucket and continue recursing.
foreach($data['agg']['buckets'] as $sub_data) {
$val = $sub_data['key'];
// Clone fields (each iteration is independent).
$new_node = $node;
$new_node[$key] = $val;
$ret = array_merge($ret, $this->bootstrapRecurse($new_node, $sub_data, $sub_data['doc_count'], $fields));
}
} else if(array_key_exists('hits', $data)) {
return [['count' => $node + ['count' => $count], 'data' => array_map([$this, 'format'], $data['hits']['hits']['hits'])]];
}
return $ret;
}
/**
* Get counts of active Alerts.
* @return array Count data.
*/
public function getActiveAlertCounts() {
$client = self::getClient();
$filter = [
[
'terms' => [
'state' => [Alert::ST_NEW, Alert::ST_INPROG]
]
]
];
$aggs = [
'stt' => [
'terms' => [ 'field' => 'state' ]
],
'esc' => [
'terms' => [
'field' => 'escalated',
],
'aggs' => ['prio' => [
'terms' => [
'field' => 'priority'
]
]]
],
'stl' => [
'date_range' => [
'field' => 'update_date',
'format' => 'x',
'ranges' => [
['to' => 'now-7d']
]
]
]
];
try {
$data = $client->search([
'index' => $this->index,
'body' => [
'query' => [
'bool' => [
'filter' => $filter
],
],
'size' => 0,
'aggs' => $aggs
]
]);
$states = [0, 0];
foreach($data['aggregations']['stt']['buckets'] as $row) {
if(array_key_exists($row['key'], $states)) {
$states[$row['key']] = $row['doc_count'];
}
}
$priorities = [0, 0, 0];
$escalated = [0];
foreach($data['aggregations']['esc']['buckets'] as $row) {
if($row['key'] == 0) {
foreach($row['prio']['buckets'] as $sub_row) {
if(array_key_exists($sub_row['key'], $priorities)) {
$priorities[$sub_row['key']] = $sub_row['doc_count'];
}
}
} else {
$escalated[0] = $row['doc_count'];
}
}
$stale = [0];
if(count($data['aggregations']['stt']['buckets']) > 0) {
$stale[0] = $data['aggregations']['stt']['buckets'][0]['doc_count'];
}
$data = array_merge($priorities, $escalated, $states, $stale);
} catch(\Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
throw new \RuntimeException('Error getting active count data');
}
return $data;
}
/**
* Get counts of new Alerts grouped by date.
* @param int $range The number of days to return data for.
* @param int $search_id The id of the search to return data for.
* @return array Count data.
*/
public function getAlertActivityCounts($range, $search_id=0) {
$client = self::getClient();
$filter = [
[
'range' => [
'create_date' => [
'lt' => 'now',
'gte' => sprintf('now-%dd/d', $range)
]
]
]
];
if($search_id != 0) {
$filter[] = [
'term' => [
'search_id' => $search_id
]
];
}
$aggs = ['agg' => [
'date_histogram' => [
'field' => 'create_date',
'interval' => '1d',
'format' => 'yyyy-MM-dd',
]
]];
try {
$data = $client->search([
'index' => $this->index,
'body' => [
'query' => [
'bool' => [
'filter' => $filter
],
],
'size' => 0,
'aggs' => $aggs
]
]);
} catch(\Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
throw new \RuntimeException('Error getting activity count data');
}
$ret = [];
$date = new \DateTime('@' . $_SERVER['REQUEST_TIME']);
$date->sub(new \DateInterval(sprintf('P%dD', $range)));
for($i = 0; $i < $range; ++$i) {
$date_str = $date->format('Y-m-d');
$ret[$date_str] = [$date_str, 0];
$date->add(new \DateInterval('P1D'));
}
foreach($data['aggregations']['agg']['buckets'] as $x) {
$ret[$x['key_as_string']] = [$x['key_as_string'], $x['doc_count']];
}
return array_values($ret);
}
/**
* Format result objects from ES.
* @param array $data Alert data.
* @return array Formatted Alert data.
*/
public static function format($data) {
return $data['_source'];
}
/**
* Register an Alert to be updated.
* @param Alert $alert The Alert to update.
*/
public function update(Alert $alert) {
$this->list[] = [[
'index' => [
'_index' => $this->index,
'_type' => $alert->getSearch(true)['type'] ?: 'null',
'_id' => $alert['alert_id'],
]
], $alert];
if(count($this->list) > self::BATCH_SIZE) {
$this->send();
}
}
/**
* Register an Alert to be deleted.
* @param Alert $alert The Alert to update.
*/
public function delete(Alert $alert) {
$this->list[] = [[
'delete' => [
'_index' => $this->index,
'_type' => $alert['type'],
'_id' => $alert['alert_id'],
]
]];
if(count($this->list) > self::BATCH_SIZE) {
$this->send();
}
}
/**
* Finish processing any remaining Alerts.
*/
public function finalize() {
$this->send();
}
private function send() {
if(count($this->list) == 0) {
return;
}
$list = [];
foreach($this->list as $alert_data) {
// Each entry in the list has at least one element. Pop that into the list.
$list[] = $alert_data[0];
// If there's just one element, we're done (it was a delete). Otherwise, insert additional data.
if(count($alert_data) == 1) {
continue;
}
$search_id = $alert_data[1]['search_id'];
if(!Util::exists($this->searches, $search_id)) {
$this->searches[$search_id] = SearchFinder::getById($search_id, true);
}
$list[] = $this->generateAlertData(
$alert_data[1], Util::get($this->searches, $search_id)
);
}
$resp = $this->client->bulk([
'body' => $list
]);
$this->list = [];
}
private function generateAlertData(Alert $alert, Search $search=null) {
$data = $alert->toArray();
$data['content'] = $this->unflatten((array)$data['content']);
// Populate search data.
$search_data = [
'tags' => Util::get($search, 'tags', []),
'priority' => Util::get($search, 'priority', Search::P_LOW),
'category' => Util::get($search, 'category', Search::$CATEGORIES['general']),
'owner' => Util::get($search, 'owner', 0),
'source' => Util::get($search, 'source', ''),
];
// Populate note data.
$alertlogs = AlertLogFinder::getByQuery([
'alert_id' => $data['id'],
'action' => [
ModelFinder::C_NEQ => AlertLog::A_CREATE
]]);
$notes = [];
foreach($alertlogs as $alertlog) {
if(strlen($alertlog['note']) > 0) {
$notes[] = $alertlog['note'];
}
}
return array_merge($data, $search_data, ['notes' => $notes]);
}
public function unflatten(array $data) {
$ret = [];
foreach($data as $key=>$val) {
$path = explode('.', $key);
$last = array_pop($path);
$node = &$ret;
foreach($path as $nkey) {
$node = &$node[$nkey];
}
$node[$last] = $val;
}
return $ret;
}
}