django/contrib/gis/geos/prototypes/io.py
import threading
from ctypes import POINTER, Structure, byref, c_byte, c_char_p, c_int, c_size_t
from django.contrib.gis.geos.base import GEOSBase
from django.contrib.gis.geos.libgeos import (
GEOM_PTR,
GEOSFuncFactory,
geos_version_tuple,
)
from django.contrib.gis.geos.prototypes.errcheck import (
check_geom,
check_sized_string,
check_string,
)
from django.contrib.gis.geos.prototypes.geom import c_uchar_p, geos_char_p
from django.utils.encoding import force_bytes
from django.utils.functional import SimpleLazyObject
# ### The WKB/WKT Reader/Writer structures and pointers ###
class WKTReader_st(Structure):
pass
class WKTWriter_st(Structure):
pass
class WKBReader_st(Structure):
pass
class WKBWriter_st(Structure):
pass
WKT_READ_PTR = POINTER(WKTReader_st)
WKT_WRITE_PTR = POINTER(WKTWriter_st)
WKB_READ_PTR = POINTER(WKBReader_st)
WKB_WRITE_PTR = POINTER(WKBReader_st)
# WKTReader routines
wkt_reader_create = GEOSFuncFactory("GEOSWKTReader_create", restype=WKT_READ_PTR)
wkt_reader_destroy = GEOSFuncFactory("GEOSWKTReader_destroy", argtypes=[WKT_READ_PTR])
wkt_reader_read = GEOSFuncFactory(
"GEOSWKTReader_read",
argtypes=[WKT_READ_PTR, c_char_p],
restype=GEOM_PTR,
errcheck=check_geom,
)
# WKTWriter routines
wkt_writer_create = GEOSFuncFactory("GEOSWKTWriter_create", restype=WKT_WRITE_PTR)
wkt_writer_destroy = GEOSFuncFactory("GEOSWKTWriter_destroy", argtypes=[WKT_WRITE_PTR])
wkt_writer_write = GEOSFuncFactory(
"GEOSWKTWriter_write",
argtypes=[WKT_WRITE_PTR, GEOM_PTR],
restype=geos_char_p,
errcheck=check_string,
)
wkt_writer_get_outdim = GEOSFuncFactory(
"GEOSWKTWriter_getOutputDimension", argtypes=[WKT_WRITE_PTR], restype=c_int
)
wkt_writer_set_outdim = GEOSFuncFactory(
"GEOSWKTWriter_setOutputDimension", argtypes=[WKT_WRITE_PTR, c_int]
)
wkt_writer_set_trim = GEOSFuncFactory(
"GEOSWKTWriter_setTrim", argtypes=[WKT_WRITE_PTR, c_byte]
)
wkt_writer_set_precision = GEOSFuncFactory(
"GEOSWKTWriter_setRoundingPrecision", argtypes=[WKT_WRITE_PTR, c_int]
)
# WKBReader routines
wkb_reader_create = GEOSFuncFactory("GEOSWKBReader_create", restype=WKB_READ_PTR)
wkb_reader_destroy = GEOSFuncFactory("GEOSWKBReader_destroy", argtypes=[WKB_READ_PTR])
class WKBReadFunc(GEOSFuncFactory):
# Although the function definitions take `const unsigned char *`
# as their parameter, we use c_char_p here so the function may
# take Python strings directly as parameters. Inside Python there
# is not a difference between signed and unsigned characters, so
# it is not a problem.
argtypes = [WKB_READ_PTR, c_char_p, c_size_t]
restype = GEOM_PTR
errcheck = staticmethod(check_geom)
wkb_reader_read = WKBReadFunc("GEOSWKBReader_read")
wkb_reader_read_hex = WKBReadFunc("GEOSWKBReader_readHEX")
# WKBWriter routines
wkb_writer_create = GEOSFuncFactory("GEOSWKBWriter_create", restype=WKB_WRITE_PTR)
wkb_writer_destroy = GEOSFuncFactory("GEOSWKBWriter_destroy", argtypes=[WKB_WRITE_PTR])
# WKB Writing prototypes.
class WKBWriteFunc(GEOSFuncFactory):
argtypes = [WKB_WRITE_PTR, GEOM_PTR, POINTER(c_size_t)]
restype = c_uchar_p
errcheck = staticmethod(check_sized_string)
wkb_writer_write = WKBWriteFunc("GEOSWKBWriter_write")
wkb_writer_write_hex = WKBWriteFunc("GEOSWKBWriter_writeHEX")
# WKBWriter property getter/setter prototypes.
class WKBWriterGet(GEOSFuncFactory):
argtypes = [WKB_WRITE_PTR]
restype = c_int
class WKBWriterSet(GEOSFuncFactory):
argtypes = [WKB_WRITE_PTR, c_int]
wkb_writer_get_byteorder = WKBWriterGet("GEOSWKBWriter_getByteOrder")
wkb_writer_set_byteorder = WKBWriterSet("GEOSWKBWriter_setByteOrder")
wkb_writer_get_outdim = WKBWriterGet("GEOSWKBWriter_getOutputDimension")
wkb_writer_set_outdim = WKBWriterSet("GEOSWKBWriter_setOutputDimension")
wkb_writer_get_include_srid = WKBWriterGet(
"GEOSWKBWriter_getIncludeSRID", restype=c_byte
)
wkb_writer_set_include_srid = WKBWriterSet(
"GEOSWKBWriter_setIncludeSRID", argtypes=[WKB_WRITE_PTR, c_byte]
)
# ### Base I/O Class ###
class IOBase(GEOSBase):
"Base class for GEOS I/O objects."
def __init__(self):
# Getting the pointer with the constructor.
self.ptr = self._constructor()
# Loading the real destructor function at this point as doing it in
# __del__ is too late (import error).
self.destructor.func
# ### Base WKB/WKT Reading and Writing objects ###
# Non-public WKB/WKT reader classes for internal use because
# their `read` methods return _pointers_ instead of GEOSGeometry
# objects.
class _WKTReader(IOBase):
_constructor = wkt_reader_create
ptr_type = WKT_READ_PTR
destructor = wkt_reader_destroy
def read(self, wkt):
if not isinstance(wkt, (bytes, str)):
raise TypeError
return wkt_reader_read(self.ptr, force_bytes(wkt))
class _WKBReader(IOBase):
_constructor = wkb_reader_create
ptr_type = WKB_READ_PTR
destructor = wkb_reader_destroy
def read(self, wkb):
"Return a _pointer_ to C GEOS Geometry object from the given WKB."
if isinstance(wkb, memoryview):
wkb_s = bytes(wkb)
return wkb_reader_read(self.ptr, wkb_s, len(wkb_s))
elif isinstance(wkb, bytes):
return wkb_reader_read_hex(self.ptr, wkb, len(wkb))
elif isinstance(wkb, str):
wkb_s = wkb.encode()
return wkb_reader_read_hex(self.ptr, wkb_s, len(wkb_s))
else:
raise TypeError
def default_trim_value():
"""
GEOS changed the default value in 3.12.0. Can be replaced by True when
3.12.0 becomes the minimum supported version.
"""
return geos_version_tuple() >= (3, 12)
DEFAULT_TRIM_VALUE = SimpleLazyObject(default_trim_value)
# ### WKB/WKT Writer Classes ###
class WKTWriter(IOBase):
_constructor = wkt_writer_create
ptr_type = WKT_WRITE_PTR
destructor = wkt_writer_destroy
_precision = None
def __init__(self, dim=2, trim=False, precision=None):
super().__init__()
self._trim = DEFAULT_TRIM_VALUE
self.trim = trim
if precision is not None:
self.precision = precision
self.outdim = dim
def write(self, geom):
"Return the WKT representation of the given geometry."
return wkt_writer_write(self.ptr, geom.ptr)
@property
def outdim(self):
return wkt_writer_get_outdim(self.ptr)
@outdim.setter
def outdim(self, new_dim):
if new_dim not in (2, 3):
raise ValueError("WKT output dimension must be 2 or 3")
wkt_writer_set_outdim(self.ptr, new_dim)
@property
def trim(self):
return self._trim
@trim.setter
def trim(self, flag):
if bool(flag) != self._trim:
self._trim = bool(flag)
wkt_writer_set_trim(self.ptr, self._trim)
@property
def precision(self):
return self._precision
@precision.setter
def precision(self, precision):
if (not isinstance(precision, int) or precision < 0) and precision is not None:
raise AttributeError(
"WKT output rounding precision must be non-negative integer or None."
)
if precision != self._precision:
self._precision = precision
wkt_writer_set_precision(self.ptr, -1 if precision is None else precision)
class WKBWriter(IOBase):
_constructor = wkb_writer_create
ptr_type = WKB_WRITE_PTR
destructor = wkb_writer_destroy
geos_version = geos_version_tuple()
def __init__(self, dim=2):
super().__init__()
self.outdim = dim
def _handle_empty_point(self, geom):
from django.contrib.gis.geos import Point
if isinstance(geom, Point) and geom.empty:
if self.srid:
# PostGIS uses POINT(NaN NaN) for WKB representation of empty
# points. Use it for EWKB as it's a PostGIS specific format.
# https://trac.osgeo.org/postgis/ticket/3181
geom = Point(float("NaN"), float("NaN"), srid=geom.srid)
else:
raise ValueError("Empty point is not representable in WKB.")
return geom
def write(self, geom):
"Return the WKB representation of the given geometry."
geom = self._handle_empty_point(geom)
wkb = wkb_writer_write(self.ptr, geom.ptr, byref(c_size_t()))
return memoryview(wkb)
def write_hex(self, geom):
"Return the HEXEWKB representation of the given geometry."
geom = self._handle_empty_point(geom)
wkb = wkb_writer_write_hex(self.ptr, geom.ptr, byref(c_size_t()))
return wkb
# ### WKBWriter Properties ###
# Property for getting/setting the byteorder.
def _get_byteorder(self):
return wkb_writer_get_byteorder(self.ptr)
def _set_byteorder(self, order):
if order not in (0, 1):
raise ValueError(
"Byte order parameter must be 0 (Big Endian) or 1 (Little Endian)."
)
wkb_writer_set_byteorder(self.ptr, order)
byteorder = property(_get_byteorder, _set_byteorder)
# Property for getting/setting the output dimension.
@property
def outdim(self):
return wkb_writer_get_outdim(self.ptr)
@outdim.setter
def outdim(self, new_dim):
if new_dim not in (2, 3):
raise ValueError("WKB output dimension must be 2 or 3")
wkb_writer_set_outdim(self.ptr, new_dim)
# Property for getting/setting the include srid flag.
@property
def srid(self):
return bool(wkb_writer_get_include_srid(self.ptr))
@srid.setter
def srid(self, include):
wkb_writer_set_include_srid(self.ptr, bool(include))
# `ThreadLocalIO` object holds instances of the WKT and WKB reader/writer
# objects that are local to the thread. The `GEOSGeometry` internals
# access these instances by calling the module-level functions, defined
# below.
class ThreadLocalIO(threading.local):
wkt_r = None
wkt_w = None
wkb_r = None
wkb_w = None
ewkb_w = None
thread_context = ThreadLocalIO()
# These module-level routines return the I/O object that is local to the
# thread. If the I/O object does not exist yet it will be initialized.
def wkt_r():
thread_context.wkt_r = thread_context.wkt_r or _WKTReader()
return thread_context.wkt_r
def wkt_w(dim=2, trim=False, precision=None):
if not thread_context.wkt_w:
thread_context.wkt_w = WKTWriter(dim=dim, trim=trim, precision=precision)
else:
thread_context.wkt_w.outdim = dim
thread_context.wkt_w.trim = trim
thread_context.wkt_w.precision = precision
return thread_context.wkt_w
def wkb_r():
thread_context.wkb_r = thread_context.wkb_r or _WKBReader()
return thread_context.wkb_r
def wkb_w(dim=2):
if not thread_context.wkb_w:
thread_context.wkb_w = WKBWriter(dim=dim)
else:
thread_context.wkb_w.outdim = dim
return thread_context.wkb_w
def ewkb_w(dim=2):
if not thread_context.ewkb_w:
thread_context.ewkb_w = WKBWriter(dim=dim)
thread_context.ewkb_w.srid = True
else:
thread_context.ewkb_w.outdim = dim
return thread_context.ewkb_w