tlsfuzzer/tlslite-ng

View on GitHub
tlslite/bufferedsocket.py

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
# Copyright (c) 2016, Hubert Kario
#
# See the LICENSE file for legal information regarding use of this file.

"""Wrapper around the socket.socket interface that provides buffering"""

from collections import deque


class BufferedSocket(object):
    """
    Socket that will buffer reads and writes to a real socket object

    When buffer_writes is enabled, writes won't be passed to the real socket
    until flush() is called.

    Not multithread safe.

    :vartype buffer_writes: boolean
    :ivar buffer_writes: whether to buffer data writes, False by default
    """

    def __init__(self, socket):
        """Associate socket with the object"""
        self.socket = socket
        self._write_queue = deque()
        self.buffer_writes = False
        self._read_buffer = bytearray()

    def send(self, data):
        """Send data to the socket"""
        if self.buffer_writes:
            self._write_queue.append(data)
            return len(data)
        return self.socket.send(data)

    def sendall(self, data):
        """Send data to the socket"""
        if self.buffer_writes:
            self._write_queue.append(data)
            return None
        return self.socket.sendall(data)

    def flush(self):
        """Send all buffered data"""
        buf = bytearray()
        for i in self._write_queue:
            buf += i
        self._write_queue.clear()
        if buf:
            self.socket.sendall(buf)

    def recv(self, bufsize):
        """Receive data from socket (socket emulation)"""
        if not self._read_buffer:
            self._read_buffer += self.socket.recv(max(4096, bufsize))
        ret = self._read_buffer[:bufsize]
        del self._read_buffer[:bufsize]
        return ret

    def getsockname(self):
        """Return the socket's own address (socket emulation)."""
        return self.socket.getsockname()

    def getpeername(self):
        """
        Return the remote address to which the socket is connected

        (socket emulation)
        """
        return self.socket.getpeername()

    def settimeout(self, value):
        """Set a timeout on blocking socket operations (socket emulation)."""
        return self.socket.settimeout(value)

    def gettimeout(self):
        """
        Return the timeout associated with socket operations

        (socket emulation)
        """
        return self.socket.gettimeout()

    def setsockopt(self, level, optname, value):
        """Set the value of the given socket option (socket emulation)."""
        return self.socket.setsockopt(level, optname, value)

    def shutdown(self, how):
        """Shutdown the underlying socket."""
        self.flush()
        return self.socket.shutdown(how)

    def close(self):
        """Close the underlying socket."""
        self.flush()
        return self.socket.close()