cuebook/CueObserve

View on GitHub
api/dbConnections/bigquery.py

Summary

Maintainability
A
0 mins
Test Coverage
F
28%
import json
import logging
from google.cloud import bigquery
from google.oauth2 import service_account

from dbConnections.utils import limitSql

logger = logging.getLogger(__name__)


class BigQuery:
    """
    Class to support functionalities on bigquery connection
    """

    @staticmethod
    def checkConnection(params: str):
        """
        Connection for bigQuery database
        :param file: credential file from bigquery to connection
        """
        res = True
        file = params.get("file", {})

        try:
            file = json.loads(file)
            service_account_info = file
            credentials = service_account.Credentials.from_service_account_info(
                service_account_info
            )
            project_id = file.get("project_id")
            client = bigquery.Client(credentials=credentials, project=project_id)
        except Exception as ex:
            logger.error("Can't connect to db with this credentials %s", str(ex))
            res = False

        return res

    @staticmethod
    def fetchDataframe(params: str, sql: str, limit: bool = False):
        """
        Fetches data using given config file and sql
        :param file: connection file
        :param sql: string sql query
        :param limit: limit data
        :returns: dataframe
        """
        dataframe = None
        file = params.get("file", {})
        try:
            if limit:
                sql = limitSql(sql)
            file = json.loads(file)
            service_account_info = file
            credentials = service_account.Credentials.from_service_account_info(
                service_account_info
            )
            project_id = file.get("project_id")
            client = bigquery.Client(credentials=credentials, project=project_id)
            dataJob = client.query(sql)
            dataframe = dataJob.to_dataframe()
        except Exception as ex:
            logger.error("Error in fetching data. Error:%s", str(ex))

        return dataframe