sentilo/sentilo

View on GitHub
sentilo-platform/sentilo-platform-service/src/main/java/org/sentilo/platform/service/impl/DataServiceImpl.java

Summary

Maintainability
A
1 hr
Test Coverage
/*
 * Sentilo
 *
 * Original version 1.4 Copyright (C) 2013 Institut Municipal d’Informàtica, Ajuntament de
 * Barcelona. Modified by Opentrends adding support for multitenant deployments and SaaS.
 * Modifications on version 1.5 Copyright (C) 2015 Opentrends Solucions i Sistemes, S.L.
 *
 *
 * This program is licensed and may be used, modified and redistributed under the terms of the
 * European Public License (EUPL), either version 1.1 or (at your option) any later version as soon
 * as they are approved by the European Commission.
 *
 * Alternatively, you may redistribute and/or modify this program under the terms of the GNU Lesser
 * General Public License as published by the Free Software Foundation; either version 3 of the
 * License, or (at your option) any later version.
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied.
 *
 * See the licenses for the specific language governing permissions, limitations and more details.
 *
 * You should have received a copy of the EUPL1.1 and the LGPLv3 licenses along with this program;
 * if not, you may find them at:
 *
 * https://joinup.ec.europa.eu/software/page/eupl/licence-eupl http://www.gnu.org/licenses/ and
 * https://www.gnu.org/licenses/lgpl.txt
 */
package org.sentilo.platform.service.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.sentilo.common.enums.EventType;
import org.sentilo.common.enums.SensorState;
import org.sentilo.platform.common.domain.DataInputMessage;
import org.sentilo.platform.common.domain.Observation;
import org.sentilo.platform.common.domain.Sensor;
import org.sentilo.platform.common.exception.EventRejectedException;
import org.sentilo.platform.common.exception.RejectedResourcesContext;
import org.sentilo.platform.common.exception.ResourceNotFoundException;
import org.sentilo.platform.common.exception.ResourceOfflineException;
import org.sentilo.platform.common.service.DataService;
import org.sentilo.platform.common.service.PublishService;
import org.sentilo.platform.common.service.ResourceService;
import org.sentilo.platform.service.monitor.Metric;
import org.sentilo.platform.service.monitor.RequestType;
import org.sentilo.platform.service.utils.QueryFilterParamsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;

@Service
public class DataServiceImpl extends AbstractPlatformServiceImpl implements DataService {

  private static final Logger LOGGER = LoggerFactory.getLogger(DataServiceImpl.class);

  @Autowired
  private ResourceService resourceService;

  @Autowired
  private PublishService publishService;

  @Value("${sentilo.server.api.data.reject-unknown-sensors:true}")
  private boolean rejectUnknownSensors;

  /*
   * (non-Javadoc)
   *
   * @see
   * org.sentilo.platform.common.service.DataService#setObservations(org.sentilo.platform.common
   * .domain.DataInputMessage)
   */
  @Override
  @Metric(requestType = RequestType.PUT, eventType = EventType.DATA)
  public void setObservations(final DataInputMessage message) {
    final List<Observation> observations = message.getObservations();
    final RejectedResourcesContext rejectedContext = new RejectedResourcesContext();

    for (final Observation observation : observations) {
      try {
        final Sensor sensor = getSensorMetadata(observation.getProvider(), observation.getSensor());
        checkTargetResourceState(sensor, observation);
        setObservation(sensor, observation);
      } catch (final ResourceNotFoundException rnfe) {
        rejectedContext.rejectEvent(observation.getSensor(), rnfe.getMessage());
        LOGGER.warn("Observation [{}] has been rejected because sensor [{}], belonging to provider [{}], doesn't exist on Sentilo.",
            observation.getValue(), observation.getSensor(), observation.getProvider());
      } catch (final ResourceOfflineException roe) {
        rejectedContext.rejectEvent(observation.getSensor(), roe.getMessage());
        LOGGER.warn("Observation [{}] has been rejected because sensor [{}], belonging to provider [{}], is not online.", observation.getValue(),
            observation.getSensor(), observation.getProvider());
      }
    }

    if (!rejectedContext.isEmpty()) {
      throw new EventRejectedException(EventType.DATA, rejectedContext);
    }
  }

  /*
   * (non-Javadoc)
   *
   * @see
   * org.sentilo.platform.common.service.DataService#deleteLastObservations(org.sentilo.platform
   * .common.domain.DataInputMessage)
   */
  @Override
  public void deleteLastObservations(final DataInputMessage message) {
    if (StringUtils.hasText(message.getSensorId())) {
      deleteLastObservation(message.getProviderId(), message.getSensorId());
    } else {
      deleteLastObservations(message.getProviderId());
    }
  }

  /*
   * (non-Javadoc)
   *
   * @see
   * org.sentilo.platform.common.service.DataService#getLastObservations(org.sentilo.platform.common
   * .domain.DataInputMessage)
   */
  @Override
  @Metric(requestType = RequestType.GET, eventType = EventType.DATA)
  public List<Observation> getLastObservations(final DataInputMessage message) {
    // Para recuperar las observaciones del sensor / sensores de un proveedor, debemos hacer lo
    // siguiente:
    // 1. Recuperar los identificadores internos de los sensores de los cuales queremos recuperar
    // las observaciones.
    // 2. Para cada sensor, recuperar las observaciones que cumplen el criterio de busqueda
    final List<Observation> globalObservations = new ArrayList<Observation>();
    final Set<String> sids = resourceService.getSensorsToInspect(message.getProviderId(), message.getSensorId());
    if (CollectionUtils.isEmpty(sids)) {
      LOGGER.debug("Provider [{}] has not sensors registered", message.getProviderId());
      return globalObservations;
    }

    LOGGER.debug("Retrieving last observations for {} sensors belonging to provider [{}]", sids.size(), message.getProviderId());

    final Iterator<String> it = sids.iterator();
    while (it.hasNext()) {
      final String sid = it.next();
      final List<Observation> observationsFromSensor = getLastObservations(sid, message);
      if (!CollectionUtils.isEmpty(observationsFromSensor)) {
        globalObservations.addAll(observationsFromSensor);
      }
    }

    return globalObservations;
  }

  private void deleteLastObservations(final String providerId) {
    final Set<String> sids = resourceService.getSensorsFromProvider(providerId);

    if (CollectionUtils.isEmpty(sids)) {
      LOGGER.debug("Provider [{}] has not sensors registered", providerId);
      return;
    }

    LOGGER.debug("Found {} sensors belonging to provider [{}]", sids.size(), providerId);
    final Iterator<String> it = sids.iterator();
    while (it.hasNext()) {
      final String sid = it.next();
      deleteLastObservation(new Long(sid));
      LOGGER.debug("Removed last observation from sensor with sid {} and belonging to provider [{}]", sid, providerId);
    }
  }

  private void deleteLastObservation(final String providerId, final String sensorId) {
    final Optional<Long> sid = sequenceUtils.getSid(providerId, sensorId);
    if (!sid.isPresent()) {
      // Si no hay identificador interno del sensor, entonces este no tiene ninguna observacion
      // registrada.
      return;
    }

    deleteLastObservation(sid.get());

    LOGGER.debug("Removed last observation from sensor [{}] belonging to provider [{}]", sensorId, providerId);
  }

  private void setObservation(final Sensor sensor, final Observation data) {
    registerSensorData(sensor, data);
    publishService.publish(data);
  }

  private List<Observation> getLastObservations(final String sid, final DataInputMessage message) {
    final Long to = QueryFilterParamsUtils.getTo(message);
    final Long from = QueryFilterParamsUtils.getFrom(message);
    final Integer limit = QueryFilterParamsUtils.getLimit(message);

    // Because sensor events may each have a different expire time, and is likely to have some
    // entries already expired (i.e. the associated sdid:{sdid} entry is expired),
    // the process read from the ZSET more than limit entries (limit + 1) : it improves the
    // performance to return the limit observations requested by the
    // client because additional reads are made only if exists these additional entries.

    final List<Observation> observations = new ArrayList<Observation>();
    boolean readMore = true;
    // To evict a situation of no return if database is inconsistent (f.e. many of the entries in
    // ZSETs
    // are related with data already expired), we limit the maximum number
    // of iterations
    final int MAX_ITERATIONS = 10;
    int iteration = 1;

    while (readMore) {
      final int offset = (iteration - 1) * limit;
      final int count = limit + 1;
      // Redis call is: ZREVRANGEBYSCORE sid:{sid}:observations to from LIMIT 0 limit
      final String redisZsetKey = keysBuilder.getSensorObservationsKey(sid);
      final Set<String> sdids = sRedisTemplate.zRevRangeByScore(redisZsetKey, to, from, offset, count);

      // As count=limit+1 and client only request limit elements, sdids set is subset to contain a
      // maximum of limit elements
      if (!CollectionUtils.isEmpty(sdids)) {
        final Set<String> sdidsToEval = sdids.size() < count ? sdids : ImmutableSet.copyOf(Iterables.limit(sdids, limit));
        final List<String> orphanEvents = addObservations(sdidsToEval, observations, limit);
        resourceService.addOrphanEventsToRemove(EventType.DATA, redisZsetKey, orphanEvents);
      }

      readMore = observations.size() < limit && !CollectionUtils.isEmpty(sdids) && sdids.size() > limit && iteration < MAX_ITERATIONS;
      iteration++;
    }

    return observations;
  }

  private List<String> addObservations(final Set<String> sdids, final List<Observation> observations, final Integer limit) {
    final List<String> orphanEvents = new ArrayList<String>();
    final Iterator<String> it = sdids.iterator();

    while (it.hasNext() && observations.size() < limit) {
      final String sSdid = it.next();
      final Long sdid = Long.parseLong(sSdid);
      final Observation observation = getObservation(sdid);
      if (observation != null) {
        observations.add(observation);
      } else {
        orphanEvents.add(sSdid);
      }
    }

    return orphanEvents;
  }

  private Observation getObservation(final Long sdid) {
    Observation observation = null;
    String sid = null;
    String value = null;
    String ts = null;
    String location = null;

    final Map<String, String> infoSdid = sRedisTemplate.hGetAll(keysBuilder.getObservationKey(sdid));
    if (!CollectionUtils.isEmpty(infoSdid)) {
      value = infoSdid.get(DATA);
      ts = infoSdid.get(TIMESTAMP);
      sid = infoSdid.get(SID);
      location = infoSdid.get(LOCATION);
    }

    if (StringUtils.hasText(sid)) {
      final Optional<Sensor> sensor = resourceService.getSensor(Long.parseLong(sid));
      if (sensor.isPresent()) {
        observation = new Observation(sensor.get().getProvider(), sensor.get().getSensor(), value, Long.parseLong(ts), location);
      } else {
        LOGGER.warn("Not found in Redis a sensor with sid {}", sid);
      }
    }

    return observation;
  }

  private void deleteLastObservation(final Long sid) {
    // Para eliminar la ultima observacion de un sensor lo que debemos hacer es lo siguiente:
    // 1. Recuperamos el ultimo elemento del Sorted Set de observaciones del sensor (i.e., el que
    // tiene score mas alto).
    // 2. Eliminamos este elemento del Sorted Set.
    // 3. Eliminamos la clave sdid:{sdid}
    final String sensorObservationsKey = keysBuilder.getSensorObservationsKey(sid);
    final Set<String> sdids = sRedisTemplate.zRange(sensorObservationsKey, -1, -1);
    if (!CollectionUtils.isEmpty(sdids)) {
      sRedisTemplate.zRemRangeByRank(sensorObservationsKey, -1, -1);
      final String sdid = sdids.iterator().next();
      sRedisTemplate.del(keysBuilder.getObservationKey(sdid));
    }
  }

  /**
   * Checks if the sensor exists in Redis and if it is enabled. Otherwise throws an exception.
   */
  private void checkTargetResourceState(final Sensor sensor, final Observation data) throws ResourceNotFoundException, ResourceOfflineException {
    final boolean existsSensor = sensor.getState() != null && !SensorState.ghost.equals(sensor.getState());

    if (!existsSensor && rejectUnknownSensors) {
      throw new ResourceNotFoundException(data.getSensor(), "Sensor");
    } else if (!existsSensor) {
      publishService.publishGhostSensorAlarm(data);
    } else if (SensorState.offline.equals(sensor.getState())) {
      throw new ResourceOfflineException(data.getSensor(), "Sensor");
    }
  }

  private void registerSensorData(final Sensor sensor, final Observation data) {
    // final Long sid = sequenceUtils.getSid(data.getProvider(), data.getSensor());
    final Long sid = sensor.getSid();
    final Long sdid = sequenceUtils.getNewSdid();
    final Long timestamp = data.getTimestamp();
    final String location = StringUtils.hasText(data.getLocation()) ? data.getLocation() : "";

    // Guardamos una hash de clave sdid:{sdid} y valores sid, data (aleatorio), timestamp y
    // location.
    final String obsKey = keysBuilder.getObservationKey(sdid);
    final Map<String, String> fields = new HashMap<String, String>();
    fields.put(SID, Long.toString(sid));
    fields.put(DATA, data.getValue());
    fields.put(TIMESTAMP, timestamp.toString());
    fields.put(LOCATION, location);
    sRedisTemplate.hmSet(obsKey, fields);

    // if expired time in seconds (ttl) is defined and !=0, set the expire time to the key
    final int ttl = ttlToExpiredTime(sensor.getTtl());
    if (ttl != 0) {
      sRedisTemplate.expire(obsKey, ttl);
    }

    // Y definimos una reverse lookup key con la cual recuperar rapidamente las observaciones de un
    // sensor.
    // A continuacion, añadimos el sdid al Sorted Set sensor:{sid}:observations. La puntuacion, o
    // score, que se asocia a cada elemento del Set es el timestamp de la observacion.
    sRedisTemplate.zAdd(keysBuilder.getSensorObservationsKey(sid), timestamp, sdid.toString());

    LOGGER.debug("Registered in Redis observation [{}] for sensor [{}] belonging to provider [{}]", sdid, data.getSensor(), data.getProvider());
  }

  private Sensor getSensorMetadata(final String provider, final String sensorId) {
    // Save both the provider and the sensor in Redis only if rejectUnknownSensors is false. By
    // default, in this context, if sensor doesn't exist, it is created as a ghost sensor
    final Optional<Sensor> sensor = resourceService.getSensor(provider, sensorId, !rejectUnknownSensors);
    return sensor.isPresent() ? sensor.get() : new Sensor(provider, sensorId);
  }

}