python/src/odin_data/shared_buffer_manager.py
import posix_ipc
import mmap
import ctypes
import os
import errno
from struct import Struct
class SharedBufferManagerException(Exception):
def __init__(self, msg, errno=None):
self.msg = msg
self.errno = errno
def __str__(self):
return str(self.msg)
class SharedBufferManager(object):
Header = Struct('QQQ')
_last_manager_id = 0x100
boost_mmap_path = '/tmp/boost_interprocess'
def __init__(self, shared_mem_name, shared_mem_size=0, buffer_size=0,
remove_when_deleted=False, boost_mmap_mode=False):
self.remove_when_deleted = remove_when_deleted
self.shared_mem = None
self.mmap_file = None
self.mapfile = None
if shared_mem_size:
total_size = shared_mem_size + SharedBufferManager.Header.size
shm_flags = posix_ipc.O_CREX
mmap_file_mode = 'w+b'
else:
total_size = 0
shm_flags = 0
mmap_file_mode = 'r+b'
if boost_mmap_mode:
# Create the boost mmap file directory if it doesn't exist alread
try:
os.makedirs(SharedBufferManager.boost_mmap_path)
except OSError as e:
if e.errno != errno.EEXIST:
raise SharedBufferManagerException(str(e))
shared_mem_name = os.path.join(SharedBufferManager.boost_mmap_path, shared_mem_name)
if shared_mem_size and os.path.exists(shared_mem_name):
raise SharedBufferManagerException("Shared memory with the specified name already exists")
try:
self.mmap_file = open(shared_mem_name, mmap_file_mode)
except IOError as e:
if e.errno == 2:
raise SharedBufferManagerException("No shared memory exists with the specified name")
else:
raise SharedBufferManagerException(str(e))
if shared_mem_size:
self.mmap_file.truncate(total_size)
mmap_size = 0
mmap_fd = self.mmap_file.fileno()
else:
try:
self.shared_mem = posix_ipc.SharedMemory(
shared_mem_name, flags=shm_flags, mode=0o755, size=total_size)
mmap_size = self.shared_mem.size
mmap_fd = self.shared_mem.fd
except posix_ipc.ExistentialError as e:
raise SharedBufferManagerException(str(e))
except posix_ipc.Error as e:
raise SharedBufferManagerException(str(e))
except ValueError as e:
raise SharedBufferManagerException(str(e))
self.mapfile = mmap.mmap(mmap_fd, mmap_size, access=mmap.ACCESS_WRITE)
self.manager_id = ctypes.c_int64.from_buffer(self.mapfile)
self.num_buffers = ctypes.c_int64.from_buffer(self.mapfile, 8)
self.buffer_size = ctypes.c_int64.from_buffer(self.mapfile, 16)
if shared_mem_size:
self.manager_id.value = self.__class__._last_manager_id
self.__class__._last_manager_id += 1
self.num_buffers.value = int(shared_mem_size / buffer_size)
self.buffer_size.value = buffer_size
self.mapfile.seek(0)
def get_manager_id(self):
return self.manager_id.value
def get_num_buffers(self):
return self.num_buffers.value
def get_buffer_size(self):
return self.buffer_size.value
def get_buffer_address(self, buffer_index):
if buffer_index < 0 or buffer_index >= self.num_buffers.value:
raise SharedBufferManagerException("Illegal buffer index specified: " + str(buffer_index))
return SharedBufferManager.Header.size + (self.buffer_size.value * buffer_index)
def read_buffer(self, buffer_index, num_bytes=1, offset=0):
buf_addr = self.get_buffer_address(buffer_index)
cur_pos = self.mapfile.tell()
start_addr = buf_addr + offset
self.mapfile.seek(start_addr)
data = self.mapfile.read(num_bytes)
self.mapfile.seek(cur_pos)
return data
def write_buffer(self, buffer_index, data, offset=0):
buf_addr = self.get_buffer_address(buffer_index)
cur_pos = self.mapfile.tell()
start_addr = buf_addr + offset
self.mapfile.seek(start_addr)
self.mapfile.write(data)
self.mapfile.seek(cur_pos)
def __del__(self):
for mapped_ctype_name in ('manager_id', 'num_buffers', 'buffer_size'):
mapped_ctype = getattr(self, mapped_ctype_name, None)
if mapped_ctype is not None:
del(mapped_ctype)
if self.mapfile:
try:
self.mapfile.close()
except BufferError:
# Trap bug in python3 mmap that doesn't release all ctypes pointers
pass
if self.mmap_file:
self.mmap_file.close()
if self.remove_when_deleted:
os.remove(self.mmap_file.name)
if self.shared_mem:
self.shared_mem.close_fd()
if self.remove_when_deleted:
self.shared_mem.unlink()