GoVanguard/legion

View on GitHub
ui/models/processmodels.py

Summary

Maintainability
F
3 days
Test Coverage
#!/usr/bin/env python

"""
LEGION (https://gotham-security.com)
Copyright (c) 2023 Gotham Security

    This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
    License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
    version.

    This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
    warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
    details.

    You should have received a copy of the GNU General Public License along with this program.
    If not, see <http://www.gnu.org/licenses/>.
"""

import re
from PyQt6 import QtWidgets, QtGui, QtCore

from app.ModelHelpers import resolveHeaders, itemInteractive
from app.auxiliary import *                                                 # for bubble sort

class ProcessesTableModel(QtCore.QAbstractTableModel):
    
    def __init__(self, controller, processes = [[]], headers = [], parent = None):
        QtCore.QAbstractTableModel.__init__(self, parent)
        self.__headers = headers
        self.__processes = processes
        self.__controller = controller
        
    def setProcesses(self, processes):
        self.__processes = processes
        
    def getProcesses(self):
        return self.__processes

    def rowCount(self, parent):
        return len(self.__processes)

    def columnCount(self, parent):
        if len(self.__processes) != 0:
            return len(self.__processes[0])
        return 0

    def headerData(self, section, orientation, role):
        return resolveHeaders(role, orientation, section, self.__headers)

    # this method takes care of how the information is displayed
    def data(self, index, role):
        if role == QtCore.Qt.ItemDataRole.DisplayRole or role == QtCore.Qt.ItemDataRole.EditRole: # how to display each cell
            value = ''
            row = index.row()
            column = index.column()
            processColumns = {0: 'progress', 1: 'display', 2: 'elapsed', 3: 'estimatedRemaining',
                              4: 'pid', 5: 'name', 6: 'tabTitle', 7: 'hostIp', 8: 'port', 9: 'protocol', 10: 'command',
                              11: 'startTime', 12: 'endTime', 13: 'outputfile', 14: 'output', 15: 'status',
                              16: 'closed'}
            try:
                if column == 0:
                    value = ''
                elif column == 2:
                    pid = int(self.__processes[row]['pid'])
                    elapsed = round(self.__controller.controller.processMeasurements.get(pid, 0), 2)
                    value = "{0:.2f}{1}".format(float(elapsed), "s")
                elif column == 3:
                    status = str(self.__processes[row]['status'])
                    if status == "Finished" or status == "Crashed" or status == "Killed":
                        estimatedRemaining = 0
                    else:
                        pid = int(self.__processes[row]['pid'])
                        elapsed = round(self.__controller.controller.processMeasurements.get(pid, 0), 2)
                        estimatedRemaining = int(self.__processes[row]['estimatedRemaining']) - float(elapsed)
                    value = "{0:.2f}{1}"\
                        .format(float(estimatedRemaining), "s") if estimatedRemaining >= 0 else 'Unknown'
                elif column == 6:
                    if not self.__processes[row]['tabTitle'] == '':
                        value = self.__processes[row]['tabTitle']
                    else:
                        value = self.__processes[row]['name']
                elif column == 8:
                    if not self.__processes[row]['port'] == '' and not self.__processes[row]['protocol'] == '':
                        value = self.__processes[row]['port'] + '/' + self.__processes[row]['protocol']
                    else:
                        value = self.__processes[row]['port']
                elif column == 16:
                    value = ""
                else:
                    try:
                        value = self.__processes[row][processColumns.get(int(column))]
                    except:
                        value = "Missing data c #{0} - {1}".format(int(column), processColumns.get(int(column)))
                        pass
            except Exception:
                value = "Missing data c #{0} - {1}".format(int(column), processColumns.get(int(column)))
                pass
            return value

    def sort(self, Ncol, order):
        self.layoutAboutToBeChanged.emit()
        array=[]

        sortColumns = {5:'name', 6:'tabTitle', 11:'startTime', 12:'endTime'}
        field = sortColumns.get(int(Ncol)) or 'status'

        try:
            if Ncol == 7:
                for i in range(len(self.__processes)):
                    array.append(IP2Int(self.__processes[i]['hostIp']))

            elif Ncol == 8:
                for i in range(len(self.__processes)):
                    if self.__processes[i]['port'] == '':
                        return
                    else:
                        array.append(int(self.__processes[i]['port']))
            else:
                for i in range(len(self.__processes)):
                    array.append(self.__processes[i][field])
        
            sortArrayWithArray(array, self.__processes)  # sort the services based on the values in the array

            if order == Qt.SortOrder.AscendingOrder:                                  # reverse if needed
                self.__processes.reverse()
                self.__controller.processesTableViewSort = 'desc'
            else:
                self.__controller.processesTableViewSort = 'asc'

            self.__controller.processesTableViewSortColumn = field

        ## Extra?
        #self.__controller.updateProcessesIcon()  # to make sure the progress GIF is displayed in the right place
            self.layoutChanged.emit()
        except:
            log.error("Failed to sort")
            pass

    # method that allows views to know how to treat each item, eg: if it should be enabled, editable, selectable etc
    def flags(self, index):
        return itemInteractive()

    def setDataList(self, processes):
        self.__processes = processes
        self.layoutAboutToBeChanged.emit()
        self.dataChanged.emit(self.createIndex(0, 0), self.createIndex(self.rowCount(0), self.columnCount(0)))
        self.layoutChanged.emit()

    ### getter functions ###

    def getProcessPidForRow(self, row):
        return self.__processes[row]['pid']
        
    def getProcessPidForId(self, dbId):
        for i in range(len(self.__processes)):
            if str(self.__processes[i]['id']) == str(dbId):
                return self.__processes[i]['pid']

    def getProcessStatusForRow(self, row):
        return self.__processes[row]['status']

    def getProcessStatusForPid(self, pid):
        for i in range(len(self.__processes)):
            if str(self.__processes[i]['pid']) == str(pid):
                return self.__processes[i]['status']
                
    def getProcessStatusForId(self, dbId):
        for i in range(len(self.__processes)):
            if str(self.__processes[i]['id']) == str(dbId):
                return self.__processes[i]['status']

    def getProcessIdForRow(self, row):
        return self.__processes[row]['id']
        
    def getToolNameForRow(self, row):
        return self.__processes[row]['name']
        
    def getRowForToolName(self, toolname):
        for i in range(len(self.__processes)):
            if self.__processes[i]['name'] == toolname:
                return i

    def getRowForDBId(self, dbid):  # new
        for i in range(len(self.__processes)):
            if self.__processes[i]['id'] == dbid:
                return i

    def getIpForRow(self, row):
        return self.__processes[row]['hostIp']

    def getPortForRow(self, row):
        return self.__processes[row]['port']

    def getProtocolForRow(self, row):
        return self.__processes[row]['protocol']
        
    def getOutputfileForRow(self, row):
        return self.__processes[row]['outputfile']