pyxcp/master/master.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Lowlevel API reflecting available XCP services.
.. note:: For technical reasons the API is split into two parts;
common methods (this file) and a Python version specific part.
.. [1] XCP Specification, Part 2 - Protocol Layer Specification
"""
import functools
import struct
import traceback
import warnings
from time import sleep
from typing import Any
from typing import Callable
from typing import Collection
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from pyxcp import checksum
from pyxcp import types
from pyxcp.config import Configuration
from pyxcp.constants import makeBytePacker
from pyxcp.constants import makeByteUnpacker
from pyxcp.constants import makeDLongPacker
from pyxcp.constants import makeDLongUnpacker
from pyxcp.constants import makeDWordPacker
from pyxcp.constants import makeDWordUnpacker
from pyxcp.constants import makeWordPacker
from pyxcp.constants import makeWordUnpacker
from pyxcp.constants import PackerType
from pyxcp.constants import UnpackerType
from pyxcp.logger import Logger
from pyxcp.master.errorhandler import disable_error_handling
from pyxcp.master.errorhandler import wrapped
from pyxcp.transport.base import createTransport
from pyxcp.utils import decode_bytes
from pyxcp.utils import delay
from pyxcp.utils import SHORT_SLEEP
def broadcasted(func: Callable):
""""""
return func
class SlaveProperties(dict):
"""Container class for fixed parameters, like byte-order, maxCTO, ..."""
def __init__(self, *args, **kws):
super(SlaveProperties, self).__init__(*args, **kws)
def __getattr__(self, name):
return self[name]
def __setattr__(self, name, value):
self[name] = value
def __getstate__(self):
return self
def __setstate__(self, state):
self = state # noqa: F841
class Master:
"""Common part of lowlevel XCP API.
Parameters
----------
transportName : str
XCP transport layer name ['can', 'eth', 'sxi']
config: dict
"""
PARAMETER_MAP = {
# Type Req'd Default
"LOGLEVEL": (str, False, "WARN"),
"DISABLE_ERROR_HANDLING": (
bool,
False,
False,
), # Bypass error-handling for performance reasons.
"SEED_N_KEY_DLL": (str, False, ""),
"SEED_N_KEY_DLL_SAME_BIT_WIDTH": (bool, False, False),
"DISCONNECT_RESPONSE_OPTIONAL": (bool, False, False),
}
def __init__(self, transportName, config=None, policy=None):
self.ctr = 0
self.succeeded = True
self.config = Configuration(self.PARAMETER_MAP or {}, config or {})
self.logger = Logger("master.Master", level=self.config.get("LOGLEVEL"))
disable_error_handling(self.config.get("DISABLE_ERROR_HANDLING"))
self.transport = createTransport(transportName, config, policy)
self.transport_name = transportName
# In some cases the transport-layer needs to communicate with us.
self.transport.parent = self
self.service = None
# (D)Word (un-)packers are byte-order dependent
# -- byte-order is returned by CONNECT_Resp (COMM_MODE_BASIC)
self.BYTE_pack = None
self.BYTE_unpack = None
self.WORD_pack = None
self.WORD_unpack = None
self.DWORD_pack = None
self.DWORD_unpack = None
self.DLONG_pack = None
self.DLONG_unpack = None
self.AG_pack = None
self.AG_unpack = None
# self.connected = False
self.mta = types.MtaType(None, None)
self.currentDaqPtr = None
self.currentProtectionStatus = None
self.seedNKeyDLL = self.config.get("SEED_N_KEY_DLL")
self.seedNKeyDLL_same_bit_width = self.config.get("SEED_N_KEY_DLL_SAME_BIT_WIDTH")
self.disconnect_response_optional = self.config.get("DISCONNECT_RESPONSE_OPTIONAL")
self.slaveProperties = SlaveProperties()
self.slaveProperties.pgmProcessor = SlaveProperties()
self.slaveProperties.transport_layer = self.transport_name.upper()
def __enter__(self):
"""Context manager entry part."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit part."""
# if self.connected:
# self.disconnect()
self.close()
if exc_type is None:
return
else:
self.succeeded = False
# print("=" * 79)
# print("Exception while in Context-Manager:\n")
self.logger.error("".join(traceback.format_exception(exc_type, exc_val, exc_tb)))
# print("=" * 79)
# return True
def _setService(self, service):
"""Records the currently processed service.
Parameters
----------
service: `pydbc.types.Command`
Note
----
Internal Function, only to be used by transport-layer.
"""
self.service = service
def close(self):
"""Closes transport layer connection."""
self.transport.policy.finalize()
self.transport.close()
# Mandatory Commands.
@wrapped
def connect(self, mode=0x00):
"""Build up connection to an XCP slave.
Before the actual XCP traffic starts a connection is required.
Parameters
----------
mode : int
connection mode; default is 0x00 (normal mode)
Returns
-------
:py:obj:`pyxcp.types.ConnectResponse`
Describes fundamental client properties.
Note
----
Every XCP slave supports at most one connection,
more attempts to connect are silently ignored.
"""
self.transport.connect()
response = self.transport.request(types.Command.CONNECT, mode & 0xFF)
# First get byte-order
resultPartial = types.ConnectResponsePartial.parse(response)
byteOrder = resultPartial.commModeBasic.byteOrder
result = types.ConnectResponse.parse(response, byteOrder=byteOrder)
byteOrderPrefix = "<" if byteOrder == types.ByteOrder.INTEL else ">"
self.slaveProperties.byteOrder = byteOrder
self.slaveProperties.maxCto = result.maxCto
self.slaveProperties.maxDto = result.maxDto
self.slaveProperties.supportsPgm = result.resource.pgm
self.slaveProperties.supportsStim = result.resource.stim
self.slaveProperties.supportsDaq = result.resource.daq
self.slaveProperties.supportsCalpag = result.resource.calpag
self.slaveProperties.slaveBlockMode = result.commModeBasic.slaveBlockMode
self.slaveProperties.addressGranularity = result.commModeBasic.addressGranularity
self.slaveProperties.protocolLayerVersion = result.protocolLayerVersion
self.slaveProperties.transportLayerVersion = result.transportLayerVersion
self.slaveProperties.optionalCommMode = result.commModeBasic.optional
self.slaveProperties.maxWriteDaqMultipleElements = (
0 if self.slaveProperties.maxCto < 10 else int((self.slaveProperties.maxCto - 2) // 8)
)
self.BYTE_pack = makeBytePacker(byteOrderPrefix)
self.BYTE_unpack = makeByteUnpacker(byteOrderPrefix)
self.WORD_pack = makeWordPacker(byteOrderPrefix)
self.WORD_unpack = makeWordUnpacker(byteOrderPrefix)
self.DWORD_pack = makeDWordPacker(byteOrderPrefix)
self.DWORD_unpack = makeDWordUnpacker(byteOrderPrefix)
self.DLONG_pack = makeDLongPacker(byteOrderPrefix)
self.DLONG_unpack = makeDLongUnpacker(byteOrderPrefix)
self.slaveProperties.bytesPerElement = None # Download/Upload commands are using element- not byte-count.
if self.slaveProperties.addressGranularity == types.AddressGranularity.BYTE:
self.AG_pack = struct.Struct("<B").pack
self.AG_unpack = struct.Struct("<B").unpack
self.slaveProperties.bytesPerElement = 1
elif self.slaveProperties.addressGranularity == types.AddressGranularity.WORD:
self.AG_pack = self.WORD_pack
self.AG_unpack = self.WORD_unpack
self.slaveProperties.bytesPerElement = 2
elif self.slaveProperties.addressGranularity == types.AddressGranularity.DWORD:
self.AG_pack = self.DWORD_pack
self.AG_unpack = self.DWORD_unpack
self.slaveProperties.bytesPerElement = 4
# self.connected = True
return result
@wrapped
def disconnect(self):
"""Releases the connection to the XCP slave.
Thereafter, no further communication with the slave is possible
(besides `connect`).
Note
-----
- If DISCONNECT is currently not possible, ERR_CMD_BUSY will be returned.
- While XCP spec. requires a response, this behavior can be made optional by adding
- `DISCONNECT_RESPONSE_OPTIONAL = true` (TOML)
- `"DISCONNECT_RESPONSE_OPTIONAL": true` (JSON)
to your configuration file.
"""
if self.disconnect_response_optional:
response = self.transport.request_optional_response(types.Command.DISCONNECT)
else:
response = self.transport.request(types.Command.DISCONNECT)
# self.connected = False
return response
@wrapped
def getStatus(self):
"""Get current status information of the slave device.
This includes the status of the resource protection, pending store
requests and the general status of data acquisition and stimulation.
Returns
-------
:obj:`pyxcp.types.GetStatusResponse`
"""
response = self.transport.request(types.Command.GET_STATUS)
result = types.GetStatusResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
self._setProtectionStatus(result.resourceProtectionStatus)
return result
@wrapped
def synch(self):
"""Synchronize command execution after timeout conditions."""
response = self.transport.request(types.Command.SYNCH)
return response
@wrapped
def getCommModeInfo(self):
"""Get optional information on different Communication Modes supported
by the slave.
Returns
-------
:obj:`pyxcp.types.GetCommModeInfoResponse`
"""
response = self.transport.request(types.Command.GET_COMM_MODE_INFO)
result = types.GetCommModeInfoResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
self.slaveProperties.interleavedMode = result.commModeOptional.interleavedMode
self.slaveProperties.masterBlockMode = result.commModeOptional.masterBlockMode
self.slaveProperties.maxBs = result.maxBs
self.slaveProperties.minSt = result.minSt
self.slaveProperties.queueSize = result.queueSize
self.slaveProperties.xcpDriverVersionNumber = result.xcpDriverVersionNumber
return result
@wrapped
def getId(self, mode: int):
"""This command is used for automatic session configuration and for
slave device identification.
Parameters
----------
mode : int
The following identification types may be requested:
- 0 ASCII text
- 1 ASAM-MC2 filename without path and extension
- 2 ASAM-MC2 filename with path and extension
- 3 URL where the ASAM-MC2 file can be found
- 4 ASAM-MC2 file to upload
- 128..255 User defined
Returns
-------
:obj:`pydbc.types.GetIDResponse`
"""
response = self.transport.request(types.Command.GET_ID, mode)
result = types.GetIDResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
result.length = self.DWORD_unpack(response[3:7])[0]
return result
@wrapped
def setRequest(self, mode: int, sessionConfigurationId: int):
"""Request to save to non-volatile memory.
Parameters
----------
mode : int (bitfield)
- 1 Request to store calibration data
- 2 Request to store DAQ list, no resume
- 4 Request to store DAQ list, resume enabled
- 8 Request to clear DAQ configuration
sessionConfigurationId : int
"""
return self.transport.request(
types.Command.SET_REQUEST,
mode,
sessionConfigurationId >> 8,
sessionConfigurationId & 0xFF,
)
@wrapped
def getSeed(self, first: int, resource: int):
"""Get seed from slave for unlocking a protected resource.
Parameters
----------
first : int
- 0 - first part of seed
- 1 - remaining part
resource : int
- Mode = =0 - Resource
- Mode == 1 - Don't care
Returns
-------
`pydbc.types.GetSeedResponse`
"""
if self.transport_name == "can":
# for CAN it might happen that the seed is longer than the max DLC
# in this case the first byte will be the current remaining seed size
# followed by the seeds bytes that can fit in the current frame
# the master must call getSeed several times until the complete seed is received
response = self.transport.request(types.Command.GET_SEED, first, resource)
size, seed = response[0], response[1:]
if size < len(seed):
seed = seed[:size]
reply = types.GetSeedResponse.parse(
types.GetSeedResponse.build({"length": size, "seed": bytes(size)}),
byteOrder=self.slaveProperties.byteOrder,
)
reply.seed = seed
return reply
else:
response = self.transport.request(types.Command.GET_SEED, first, resource)
return types.GetSeedResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def unlock(self, length: int, key: bytes):
"""Send key to slave for unlocking a protected resource.
Parameters
----------
length : int
indicates the (remaining) number of key bytes.
key : bytes
Returns
-------
:obj:`pydbc.types.ResourceType`
Note
----
The master has to use :meth:`unlock` in a defined sequence together
with :meth:`getSeed`. The master only can send an :meth:`unlock` sequence
if previously there was a :meth:`getSeed` sequence. The master has
to send the first `unlocking` after a :meth:`getSeed` sequence with
a Length containing the total length of the key.
"""
response = self.transport.request(types.Command.UNLOCK, length, *key)
result = types.ResourceType.parse(response, byteOrder=self.slaveProperties.byteOrder)
self._setProtectionStatus(result)
return result
@wrapped
def setMta(self, address: int, addressExt: int = 0x00):
"""Set Memory Transfer Address in slave.
Parameters
----------
address : int
addressExt : int
Note
----
The MTA is used by :meth:`buildChecksum`, :meth:`upload`, :meth:`download`, :meth:`downloadNext`,
:meth:`downloadMax`, :meth:`modifyBits`, :meth:`programClear`, :meth:`program`, :meth:`programNext`
and :meth:`programMax`.
"""
self.mta = types.MtaType(address, addressExt) # Keep track of MTA (needed for error-handling).
addr = self.DWORD_pack(address)
return self.transport.request(types.Command.SET_MTA, 0, 0, addressExt, *addr)
@wrapped
def upload(self, length: int):
"""Transfer data from slave to master.
Parameters
----------
length : int
Number of elements (address granularity).
Note
----
Adress is set via :meth:`setMta` (Some services like :meth:`getID` also set the MTA).
Returns
-------
bytes
"""
byte_count = length * self.slaveProperties.bytesPerElement
response = self.transport.request(types.Command.UPLOAD, length)
if byte_count > (self.slaveProperties.maxCto - 1):
block_response = self.transport.block_receive(length_required=(byte_count - len(response)))
response += block_response
elif self.transport_name == "can":
# larger sizes will send in multiple CAN messages
# each valid message will start with 0xFF followed by the upload bytes
# the last message might be padded to the required DLC
rem = byte_count - len(response)
while rem:
if len(self.transport.resQueue):
data = self.transport.resQueue.popleft()
response += data[1 : rem + 1]
rem = byte_count - len(response)
else:
sleep(SHORT_SLEEP)
return response
@wrapped
def shortUpload(self, length: int, address: int, addressExt: int = 0x00):
"""Transfer data from slave to master.
As opposed to :meth:`upload` this service also includes address information.
Parameters
----------
length : int
Number of elements (address granularity).
address : int
addressExt : int
Returns
-------
bytes
"""
addr = self.DWORD_pack(address)
byte_count = length * self.slaveProperties.bytesPerElement
max_byte_count = self.slaveProperties.maxCto - 1
if byte_count > max_byte_count:
self.logger.warn(f"SHORT_UPLOAD: {byte_count} bytes exceeds the maximum value of {max_byte_count}.")
response = self.transport.request(types.Command.SHORT_UPLOAD, length, 0, addressExt, *addr)
return response[:byte_count]
@wrapped
def buildChecksum(self, blocksize: int):
"""Build checksum over memory range.
Parameters
----------
blocksize : int
Returns
-------
:obj:`~pyxcp.types.BuildChecksumResponse`
.. note:: Adress is set via `setMta`
See Also
--------
:mod:`~pyxcp.checksum`
"""
bs = self.DWORD_pack(blocksize)
response = self.transport.request(types.Command.BUILD_CHECKSUM, 0, 0, 0, *bs)
return types.BuildChecksumResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def transportLayerCmd(self, subCommand: int, *data: List[bytes]):
"""Execute transfer-layer specific command.
Parameters
----------
subCommand : int
data : bytes
Note
----
For details refer to XCP specification.
"""
return self.transport.request_optional_response(types.Command.TRANSPORT_LAYER_CMD, subCommand, *data)
@wrapped
def userCmd(self, subCommand: int, data: bytes):
"""Execute proprietary command implemented in your XCP client.
Parameters
----------
subCommand : int
data : bytes
.. note:: For details refer to your XCP client vendor.
"""
response = self.transport.request(types.Command.USER_CMD, subCommand, *data)
return response
@wrapped
def getVersion(self):
"""Get version information.
This command returns detailed information about the implemented
protocol layer version of the XCP slave and the transport layer
currently in use.
Returns
-------
:obj:`~types.GetVersionResponse`
"""
response = self.transport.request(types.Command.GET_VERSION)
result = types.GetVersionResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
self.slaveProperties.protocolMajor = result.protocolMajor
self.slaveProperties.protocolMinor = result.protocolMinor
self.slaveProperties.transportMajor = result.transportMajor
self.slaveProperties.transportMinor = result.transportMinor
return result
def fetch(self, length: int, limitPayload: int = None): # TODO: pull
"""Convenience function for data-transfer from slave to master
(Not part of the XCP Specification).
Parameters
----------
length : int
limitPayload : int
transfer less bytes then supported by transport-layer
Returns
-------
bytes
Note
----
address is not included because of services implicitly setting address information like :meth:`getID` .
"""
if limitPayload and limitPayload < 8:
raise ValueError("Payload must be at least 8 bytes - given: {}".format(limitPayload))
slaveBlockMode = self.slaveProperties.slaveBlockMode
if slaveBlockMode:
maxPayload = 255
else:
maxPayload = self.slaveProperties.maxCto - 1
payload = min(limitPayload, maxPayload) if limitPayload else maxPayload
chunkSize = payload
chunks = range(length // chunkSize)
remaining = length % chunkSize
result = []
for _ in chunks:
data = self.upload(chunkSize)
result.extend(data)
if remaining:
data = self.upload(remaining)
result.extend(data)
return bytes(result)
pull = fetch # fetch() may be completely replaced by pull() someday.
def push(self, address: int, data: bytes, callback=None):
"""Convenience function for data-transfer from master to slave.
(Not part of the XCP Specification).
Parameters
----------
address: int
data : bytes
Arbitrary number of bytes.
Returns
-------
"""
self._generalized_downloader(
address=address,
data=data,
maxCto=self.slaveProperties.maxCto,
maxBs=self.slaveProperties.maxBs,
minSt=self.slaveProperties.minSt,
master_block_mode=self.slaveProperties.masterBlockMode,
dl_func=self.download,
dl_next_func=self.downloadNext,
callback=callback,
)
def flash_program(self, address: int, data: bytes, callback=None):
"""Convenience function for flash programing.
(Not part of the XCP Specification).
Parameters
----------
address: int
data : bytes
Arbitrary number of bytes.
Returns
-------
"""
self._generalized_downloader(
address=address,
data=data,
maxCto=self.slaveProperties.pgmProcessor.maxCtoPgm,
maxBs=self.slaveProperties.pgmProcessor.maxBsPgm,
minSt=self.slaveProperties.pgmProcessor.minStPgm,
master_block_mode=self.slaveProperties.pgmProcessor.masterBlockMode,
dl_func=self.program,
dl_next_func=self.programNext,
callback=callback,
)
def _generalized_downloader(
self,
address: int,
data: bytes,
maxCto: int,
maxBs: int,
minSt: int,
master_block_mode: bool,
dl_func,
dl_next_func,
callback=None,
):
""" """
self.setMta(address)
minSt /= 10000.0
block_downloader = functools.partial(
self._block_downloader,
dl_func=dl_func,
dl_next_func=dl_next_func,
minSt=minSt,
)
total_length = len(data)
if master_block_mode:
max_payload = min(maxBs * (maxCto - 2), 255)
else:
max_payload = maxCto - 2
offset = 0
if master_block_mode:
remaining = total_length
blocks = range(total_length // max_payload)
percent_complete = 1
remaining_block_size = total_length % max_payload
for _ in blocks:
block = data[offset : offset + max_payload]
block_downloader(block)
offset += max_payload
remaining -= max_payload
if callback and remaining <= total_length - (total_length / 100) * percent_complete:
callback(percent_complete)
percent_complete += 1
if remaining_block_size:
block = data[offset : offset + remaining_block_size]
block_downloader(block)
if callback:
callback(percent_complete)
else:
chunk_size = max_payload
chunks = range(total_length // chunk_size)
remaining = total_length % chunk_size
percent_complete = 1
callback_remaining = total_length
for _ in chunks:
block = data[offset : offset + max_payload]
dl_func(block, max_payload, last=True)
offset += max_payload
callback_remaining -= chunk_size
if callback and callback_remaining <= total_length - (total_length / 100) * percent_complete:
callback(percent_complete)
percent_complete += 1
if remaining:
block = data[offset : offset + remaining]
dl_func(block, remaining, last=True)
if callback:
callback(percent_complete)
def _block_downloader(self, data: bytes, dl_func=None, dl_next_func=None, minSt=0):
"""Re-usable block downloader.
Parameters
----------
data : bytes
Arbitrary number of bytes.
dl_func: method
usually :meth: `download` or :meth:`program`
dl_next_func: method
usually :meth: `downloadNext` or :meth:`programNext`
minSt: int
Minimum separation time of frames.
"""
length = len(data)
max_packet_size = self.slaveProperties.maxCto - 2 # Command ID + Length
packets = range(length // max_packet_size)
offset = 0
remaining = length % max_packet_size
remaining_block_size = length
index = 0
for index in packets:
packet_data = data[offset : offset + max_packet_size]
last = (remaining_block_size - max_packet_size) == 0
if index == 0:
dl_func(packet_data, length, last) # Transmit the complete length in the first CTO.
else:
dl_next_func(packet_data, remaining_block_size, last)
offset += max_packet_size
remaining_block_size -= max_packet_size
delay(minSt)
if remaining:
packet_data = data[offset : offset + remaining]
if index == 0:
# length of data is smaller than maxCto - 2
dl_func(packet_data, remaining, last=True)
else:
dl_next_func(packet_data, remaining, last=True)
delay(minSt)
@wrapped
def download(self, data: bytes, blockModeLength=None, last=False):
"""Transfer data from master to slave.
Parameters
----------
data : bytes
Data to send to slave.
blockModeLength : int or None
for block mode, the download request must contain the length of the whole block,
not just the length in the current packet. The whole block length can be given here for block-mode
transfers. For normal mode, the length indicates the actual packet's payload length.
Note
----
Adress is set via :meth:`setMta`
"""
if blockModeLength is None or last:
# standard mode
length = len(data)
response = self.transport.request(types.Command.DOWNLOAD, length, *data)
return response
else:
# block mode
if not isinstance(blockModeLength, int):
raise TypeError("blockModeLength must be int!")
self.transport.block_request(types.Command.DOWNLOAD, blockModeLength, *data)
return None
@wrapped
def downloadNext(self, data: bytes, remainingBlockLength, last=False):
"""Transfer data from master to slave (block mode).
Parameters
----------
data : bytes
remainingBlockLength : int
This parameter has to be given the remaining length in the block
last : bool
The block mode implementation shall indicate the last packet in the block with this parameter, because
the slave device will send the response after this.
"""
if last:
# last DOWNLOAD_NEXT packet in a block: the slave device has to send the response after this.
response = self.transport.request(types.Command.DOWNLOAD_NEXT, remainingBlockLength, *data)
return response
else:
# the slave device won't respond to consecutive DOWNLOAD_NEXT packets in block mode,
# so we must not wait for any response
self.transport.block_request(types.Command.DOWNLOAD_NEXT, remainingBlockLength, *data)
return None
@wrapped
def downloadMax(self, data: bytes):
"""Transfer data from master to slave (fixed size).
Parameters
----------
data : bytes
"""
return self.transport.request(types.Command.DOWNLOAD_MAX, *data)
@wrapped
def shortDownload(self, address, addressExt, data):
length = len(data)
addr = self.DWORD_pack(address)
return self.transport.request(types.Command.SHORT_DOWNLOAD, length, 0, addressExt, *addr, *data)
@wrapped
def modifyBits(self, shiftValue, andMask, xorMask):
# A = ( (A) & ((~((dword)(((word)~MA)<<S))) )^((dword)(MX<<S)) )
am = self.WORD_pack(andMask)
xm = self.WORD_pack(xorMask)
return self.transport.request(types.Command.MODIFY_BITS, shiftValue, *am, *xm)
# Page Switching Commands (PAG)
@wrapped
def setCalPage(self, mode: int, logicalDataSegment: int, logicalDataPage: int):
"""Set calibration page.
Parameters
----------
mode : int (bitfield)
- 0x01 - The given page will be used by the slave device application.
- 0x02 - The slave device XCP driver will access the given page.
- 0x80 - The logical segment number is ignored. The command applies to all segments
logicalDataSegment : int
logicalDataPage : int
"""
return self.transport.request(types.Command.SET_CAL_PAGE, mode, logicalDataSegment, logicalDataPage)
@wrapped
def getCalPage(self, mode: int, logicalDataSegment: int):
"""Get calibration page
Parameters
----------
mode : int
logicalDataSegment : int
"""
response = self.transport.request(types.Command.GET_CAL_PAGE, mode, logicalDataSegment)
return response[2]
@wrapped
def getPagProcessorInfo(self):
"""Get general information on PAG processor.
Returns
-------
`pydbc.types.GetPagProcessorInfoResponse`
"""
response = self.transport.request(types.Command.GET_PAG_PROCESSOR_INFO)
return types.GetPagProcessorInfoResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def getSegmentInfo(self, mode, segmentNumber, segmentInfo, mappingIndex):
"""Get specific information for a segment.
Parameters
----------
mode : int
- 0 = get basic address info for this segment
- 1 = get standard info for this segment
- 2 = get address mapping info for this segment
segmentNumber : int
segmentInfo : int
Mode 0:
- 0 = address
- 1 = length
Mode 1:
- don't care
Mode 2:
- 0 = source address
- 1 = destination address
- 2 = length address
mappingIndex : int
- Mode 0: don't care
- Mode 1: don't care
- Mode 2: identifier for address mapping range that mapping_info belongs to.
"""
response = self.transport.request(
types.Command.GET_SEGMENT_INFO,
mode,
segmentNumber,
segmentInfo,
mappingIndex,
)
if mode == 0:
return types.GetSegmentInfoMode0Response.parse(response, byteOrder=self.slaveProperties.byteOrder)
elif mode == 1:
return types.GetSegmentInfoMode1Response.parse(response, byteOrder=self.slaveProperties.byteOrder)
elif mode == 2:
return types.GetSegmentInfoMode2Response.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def getPageInfo(self, segmentNumber, pageNumber):
"""Get specific information for a page.
Parameters
----------
segmentNumber : int
pageNumber : int
"""
response = self.transport.request(types.Command.GET_PAGE_INFO, 0, segmentNumber, pageNumber)
return (
types.PageProperties.parse(bytes([response[0]]), byteOrder=self.slaveProperties.byteOrder),
response[1],
)
@wrapped
def setSegmentMode(self, mode, segmentNumber):
"""Set mode for a segment.
Parameters
----------
mode : int (bitfield)
1 = enable FREEZE Mode
segmentNumber : int
"""
return self.transport.request(types.Command.SET_SEGMENT_MODE, mode, segmentNumber)
@wrapped
def getSegmentMode(self, segmentNumber):
"""Get mode for a segment.
Parameters
----------
segmentNumber : int
"""
response = self.transport.request(types.Command.GET_SEGMENT_MODE, 0, segmentNumber)
return response[1]
@wrapped
def copyCalPage(self, srcSegment, srcPage, dstSegment, dstPage):
"""Copy page.
Parameters
----------
srcSegment : int
srcPage : int
dstSegment : int
dstPage : int
"""
return self.transport.request(types.Command.COPY_CAL_PAGE, srcSegment, srcPage, dstSegment, dstPage)
# DAQ
@wrapped
def setDaqPtr(self, daqListNumber, odtNumber, odtEntryNumber):
self.currentDaqPtr = types.DaqPtr(daqListNumber, odtNumber, odtEntryNumber) # Needed for errorhandling.
daqList = self.WORD_pack(daqListNumber)
response = self.transport.request(types.Command.SET_DAQ_PTR, 0, *daqList, odtNumber, odtEntryNumber)
return response
@wrapped
def clearDaqList(self, daqListNumber):
"""Clear DAQ list configuration.
Parameters
----------
daqListNumber : int
"""
daqList = self.WORD_pack(daqListNumber)
return self.transport.request(types.Command.CLEAR_DAQ_LIST, 0, *daqList)
@wrapped
def writeDaq(self, bitOffset, entrySize, addressExt, address):
"""Write element in ODT entry.
Parameters
----------
bitOffset : int
Position of bit in 32-bit variable referenced by the address and
extension below
entrySize : int
addressExt : int
address : int
"""
addr = self.DWORD_pack(address)
return self.transport.request(types.Command.WRITE_DAQ, bitOffset, entrySize, addressExt, *addr)
@wrapped
def setDaqListMode(self, mode, daqListNumber, eventChannelNumber, prescaler, priority):
dln = self.WORD_pack(daqListNumber)
ecn = self.WORD_pack(eventChannelNumber)
return self.transport.request(types.Command.SET_DAQ_LIST_MODE, mode, *dln, *ecn, prescaler, priority)
@wrapped
def getDaqListMode(self, daqListNumber):
"""Get mode from DAQ list.
Parameters
----------
daqListNumber : int
Returns
-------
`pyxcp.types.GetDaqListModeResponse`
"""
dln = self.WORD_pack(daqListNumber)
response = self.transport.request(types.Command.GET_DAQ_LIST_MODE, 0, *dln)
return types.GetDaqListModeResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def startStopDaqList(self, mode, daqListNumber):
"""Start /stop/select DAQ list.
Parameters
----------
mode : int
0 = stop
1 = start
2 = select
daqListNumber : int
"""
dln = self.WORD_pack(daqListNumber)
response = self.transport.request(types.Command.START_STOP_DAQ_LIST, mode, *dln)
return types.StartStopDaqListResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def startStopSynch(self, mode):
"""Start/stop DAQ lists (synchronously).
Parameters
----------
mode : int
0 = stop all
1 = start selected
2 = stop selected
"""
return self.transport.request(types.Command.START_STOP_SYNCH, mode)
@wrapped
def writeDaqMultiple(self, daqElements):
"""Write multiple elements in ODT.
Parameters
----------
daqElements : list of `dict` containing the following keys: *bitOffset*, *size*, *address*, *addressExt*.
"""
if len(daqElements) > self.slaveProperties.maxWriteDaqMultipleElements:
raise ValueError("At most {} daqElements are permitted.".format(self.slaveProperties.maxWriteDaqMultipleElements))
data = bytearray()
data.append(len(daqElements))
for daqElement in daqElements:
data.extend(types.DaqElement.build(daqElement, byteOrder=self.slaveProperties.byteOrder))
return self.transport.request(types.Command.WRITE_DAQ_MULTIPLE, *data)
# optional
@wrapped
def getDaqClock(self):
"""Get DAQ clock from slave.
Returns
-------
int
Current timestamp, format specified by `getDaqResolutionInfo`
"""
response = self.transport.request(types.Command.GET_DAQ_CLOCK)
result = types.GetDaqClockResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
return result.timestamp
@wrapped
def readDaq(self):
"""Read element from ODT entry.
Returns
-------
`pyxcp.types.ReadDaqResponse`
"""
response = self.transport.request(types.Command.READ_DAQ)
return types.ReadDaqResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def getDaqProcessorInfo(self):
"""Get general information on DAQ processor.
Returns
-------
`pyxcp.types.GetDaqProcessorInfoResponse`
"""
response = self.transport.request(types.Command.GET_DAQ_PROCESSOR_INFO)
return types.GetDaqProcessorInfoResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def getDaqResolutionInfo(self):
"""Get general information on DAQ processing resolution.
Returns
-------
`pyxcp.types.GetDaqResolutionInfoResponse`
"""
response = self.transport.request(types.Command.GET_DAQ_RESOLUTION_INFO)
return types.GetDaqResolutionInfoResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def getDaqListInfo(self, daqListNumber):
"""Get specific information for a DAQ list.
Parameters
----------
daqListNumber : int
"""
dln = self.WORD_pack(daqListNumber)
response = self.transport.request(types.Command.GET_DAQ_LIST_INFO, 0, *dln)
return types.GetDaqListInfoResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def getDaqEventInfo(self, eventChannelNumber):
"""Get specific information for an event channel.
Parameters
----------
eventChannelNumber : int
Returns
-------
`pyxcp.types.GetEventChannelInfoResponse`
"""
ecn = self.WORD_pack(eventChannelNumber)
response = self.transport.request(types.Command.GET_DAQ_EVENT_INFO, 0, *ecn)
return types.GetEventChannelInfoResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def dtoCtrProperties(self, modifier, eventChannel, relatedEventChannel, mode):
"""DTO CTR properties
Parameters
----------
modifier :
eventChannel : int
relatedEventChannel : int
mode :
Returns
-------
`pyxcp.types.DtoCtrPropertiesResponse`
"""
data = bytearray()
data.append(modifier)
data.extend(self.WORD_pack(eventChannel))
data.extend(self.WORD_pack(relatedEventChannel))
data.append(mode)
response = self.transport.request(types.Command.DTO_CTR_PROPERTIES, *data)
return types.DtoCtrPropertiesResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def setDaqPackedMode(self, daqListNumber, daqPackedMode, dpmTimestampMode=None, dpmSampleCount=None):
"""Set DAQ List Packed Mode.
Parameters
----------
daqListNumber : int
daqPackedMode : int
"""
params = []
dln = self.WORD_pack(daqListNumber)
params.extend(dln)
params.append(daqPackedMode)
if daqPackedMode == 1 or daqPackedMode == 2:
params.append(dpmTimestampMode)
dsc = self.WORD_pack(dpmSampleCount)
params.extend(dsc)
return self.transport.request(types.Command.SET_DAQ_PACKED_MODE, *params)
@wrapped
def getDaqPackedMode(self, daqListNumber):
"""Get DAQ List Packed Mode.
This command returns information of the currently active packed mode of
the addressed DAQ list.
Parameters
----------
daqListNumber : int
"""
dln = self.WORD_pack(daqListNumber)
response = self.transport.request(types.Command.GET_DAQ_PACKED_MODE, *dln)
return types.GetDaqPackedModeResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
# dynamic
@wrapped
def freeDaq(self):
"""Clear dynamic DAQ configuration."""
return self.transport.request(types.Command.FREE_DAQ)
@wrapped
def allocDaq(self, daqCount):
"""Allocate DAQ lists.
Parameters
----------
daqCount : int
number of DAQ lists to be allocated
"""
dq = self.WORD_pack(daqCount)
return self.transport.request(types.Command.ALLOC_DAQ, 0, *dq)
@wrapped
def allocOdt(self, daqListNumber, odtCount):
dln = self.WORD_pack(daqListNumber)
return self.transport.request(types.Command.ALLOC_ODT, 0, *dln, odtCount)
@wrapped
def allocOdtEntry(self, daqListNumber, odtNumber, odtEntriesCount):
dln = self.WORD_pack(daqListNumber)
return self.transport.request(types.Command.ALLOC_ODT_ENTRY, 0, *dln, odtNumber, odtEntriesCount)
# PGM
@wrapped
def programStart(self):
"""Indicate the beginning of a programming sequence.
Returns
-------
`pyxcp.types.ProgramStartResponse`
"""
response = self.transport.request(types.Command.PROGRAM_START)
result = types.ProgramStartResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
self.slaveProperties.pgmProcessor.commModePgm = result.commModePgm
self.slaveProperties.pgmProcessor.maxCtoPgm = result.maxCtoPgm
self.slaveProperties.pgmProcessor.maxBsPgm = result.maxBsPgm
self.slaveProperties.pgmProcessor.minStPgm = result.minStPgm
self.slaveProperties.pgmProcessor.queueSizePgm = result.queueSizePgm
self.slaveProperties.pgmProcessor.slaveBlockMode = result.commModePgm.slaveBlockMode
self.slaveProperties.pgmProcessor.interleavedMode = result.commModePgm.interleavedMode
self.slaveProperties.pgmProcessor.masterBlockMode = result.commModePgm.masterBlockMode
return result
@wrapped
def programClear(self, mode: int, clearRange: int):
"""Clear a part of non-volatile memory.
Parameters
----------
mode : int
0x00 = the absolute access mode is active (default)
0x01 = the functional access mode is active
clearRange : int
"""
cr = self.DWORD_pack(clearRange)
response = self.transport.request(types.Command.PROGRAM_CLEAR, mode, 0, 0, *cr)
# ERR_ACCESS_LOCKED
return response
@wrapped
def program(self, data: bytes, blockLength, last=False):
"""Parameters
----------
data : bytes
Data to send to slave.
blockModeLength : int
the program request must contain the length of the whole block, not just the length
in the current packet.
last : bool
Indicates that this is the only packet in the block, because
the slave device will send the response after this.
Note
----
Adress is set via :meth:`setMta`
"""
# d = bytearray()
# d.append(len(data))
# if self.slaveProperties.addressGranularity == types.AddressGranularity.DWORD:
# d.extend(b"\x00\x00") # alignment bytes
# for e in data:
# d.extend(self.AG_pack(e))
if last:
# last PROGRAM_NEXT packet in a block: the slave device has to send the response after this.
response = self.transport.request(types.Command.PROGRAM, blockLength, *data)
return response
else:
# the slave device won't respond to consecutive PROGRAM_NEXT packets in block mode,
# so we must not wait for any response
self.transport.block_request(types.Command.PROGRAM, blockLength, *data)
return None
@wrapped
def programReset(self, wait_for_optional_response=True):
"""Indicate the end of a programming sequence."""
if wait_for_optional_response:
return self.transport.request_optional_response(types.Command.PROGRAM_RESET)
else:
return self.transport.block_request(types.Command.PROGRAM_RESET)
@wrapped
def getPgmProcessorInfo(self):
"""Get general information on PGM processor."""
response = self.transport.request(types.Command.GET_PGM_PROCESSOR_INFO)
result = types.GetPgmProcessorInfoResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
self.slaveProperties.pgmProcessor.pgmProperties = result.pgmProperties
self.slaveProperties.pgmProcessor.maxSector = result.maxSector
return result
@wrapped
def getSectorInfo(self, mode, sectorNumber):
"""Get specific information for a sector."""
response = self.transport.request(types.Command.GET_SECTOR_INFO, mode, sectorNumber)
if mode == 0 or mode == 1:
return types.GetSectorInfoResponseMode01.parse(response, byteOrder=self.slaveProperties.byteOrder)
elif mode == 2:
return types.GetSectorInfoResponseMode2.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def programPrepare(self, codesize):
"""Prepare non-volatile memory programming."""
cs = self.WORD_pack(codesize)
return self.transport.request(types.Command.PROGRAM_PREPARE, 0x00, *cs)
@wrapped
def programFormat(self, compressionMethod, encryptionMethod, programmingMethod, accessMethod):
return self.transport.request(
types.Command.PROGRAM_FORMAT,
compressionMethod,
encryptionMethod,
programmingMethod,
accessMethod,
)
@wrapped
def programNext(self, data: bytes, remainingBlockLength: int, last: bool = False):
# d = bytearray()
# d.append(len(data))
# if self.slaveProperties.addressGranularity == types.AddressGranularity.DWORD:
# d.extend(b"\x00\x00") # alignment bytes
# for e in data:
# d.extend(self.AG_pack(e))
if last:
# last PROGRAM_NEXT packet in a block: the slave device has to send the response after this.
response = self.transport.request(types.Command.PROGRAM_NEXT, remainingBlockLength, *data)
return response
else:
# the slave device won't respond to consecutive PROGRAM_NEXT packets in block mode,
# so we must not wait for any response
self.transport.block_request(types.Command.PROGRAM_NEXT, remainingBlockLength, *data)
return None
@wrapped
def programMax(self, data):
d = bytearray()
if self.slaveProperties.addressGranularity == types.AddressGranularity.WORD:
d.extend(b"\x00") # alignment bytes
elif self.slaveProperties.addressGranularity == types.AddressGranularity.DWORD:
d.extend(b"\x00\x00\x00") # alignment bytes
for e in data:
d.extend(self.AG_pack(e))
return self.transport.request(types.Command.PROGRAM_MAX, *d)
@wrapped
def programVerify(self, verMode, verType, verValue):
data = bytearray()
data.extend(self.WORD_pack(verType))
data.extend(self.DWORD_pack(verValue))
return self.transport.request(types.Command.PROGRAM_VERIFY, verMode, *data)
# DBG
@wrapped
def dbgAttach(self):
"""Returns detailed information about the implemented version of the SW-DBG feature of the XCP slave
Returns
-------
`pyxcp.types.DbgAttachResponse`
"""
response = self.transport.request(types.Command.DBG_ATTACH)
return types.DbgAttachResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def dbgGetVendorInfo(self):
""""""
response = self.transport.request(types.Command.DBG_GET_VENDOR_INFO)
return types.DbgGetVendorInfoResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def dbgGetModeInfo(self):
""""""
response = self.transport.request(types.Command.DBG_GET_MODE_INFO)
return types.DbgGetModeInfoResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def dbgGetJtagId(self):
""""""
response = self.transport.request(types.Command.DBG_GET_JTAG_ID)
return types.DbgGetJtagIdResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def dbgHaltAfterReset(self):
""""""
return self.transport.request(types.Command.DBG_HALT_AFTER_RESET)
@wrapped
def dbgGetHwioInfo(self, index: int):
""""""
response = self.transport.request(types.Command.DBG_GET_HWIO_INFO, index)
return types.DbgGetHwioInfoResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def dbgSetHwioEvent(self, index: int, trigger: int):
""""""
return self.transport.request(types.Command.DBG_SET_HWIO_EVENT, index, trigger)
@wrapped
def dbgHwioControl(self, pins):
""""""
d = bytearray()
d.extend(self.BYTE_pack(len(pins)))
for p in pins:
d.extend(self.BYTE_pack(p[0])) # index
d.extend(self.BYTE_pack(p[1])) # state
d.extend(self.WORD_pack(p[2])) # frequency
response = self.transport.request(types.Command.DBG_HWIO_CONTROL, *d)
return types.DbgHwioControlResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def dbgExclusiveTargetAccess(self, mode: int, context: int):
""""""
return self.transport.request(types.Command.DBG_EXCLUSIVE_TARGET_ACCESS, mode, context)
@wrapped
def dbgSequenceMultiple(self, mode: int, num: int, *seq):
""""""
response = self.transport.request(types.Command.DBG_SEQUENCE_MULTIPLE, mode, self.WORD_pack(num), *seq)
return types.DbgSequenceMultipleResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def dbgLlt(self, num: int, mode: int, *llts):
""""""
response = self.transport.request(types.Command.DBG_LLT, num, mode, *llts)
return types.DbgLltResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def dbgReadModifyWrite(self, tri: int, width: int, address: int, mask: int, data: int):
""""""
d = bytearray()
d.extend(b"\x00")
d.append(tri)
d.append(width)
d.extend(b"\x00\x00")
d.extend(self.DLONG_pack(address))
if width == 0x01:
d.extend(self.BYTE_pack(mask))
d.extend(self.BYTE_pack(data))
elif width == 0x02:
d.extend(self.WORD_pack(mask))
d.extend(self.WORD_pack(data))
elif width == 0x04:
d.extend(self.DWORD_pack(mask))
d.extend(self.DWORD_pack(data))
elif width == 0x08:
d.extend(self.DLONG_pack(mask))
d.extend(self.DLONG_pack(data))
response = self.transport.request(types.Command.DBG_READ_MODIFY_WRITE, *d)
return types.DbgReadModifyWriteResponse.parse(response, byteOrder=self.slaveProperties.byteOrder, width=width)
@wrapped
def dbgWrite(self, tri: int, width: int, address: int, data):
""""""
d = bytearray()
d.extend(b"\x00")
d.append(tri)
self._dbg_width = width
d.append(width)
d.extend(self.WORD_pack(len(data)))
d.extend(self.DLONG_pack(address))
for da in data:
if width == 0x01:
d.extend(self.BYTE_pack(da))
elif width == 0x02:
d.extend(self.WORD_pack(da))
elif width == 0x04:
d.extend(self.DWORD_pack(da))
elif width == 0x08:
d.extend(self.DLONG_pack(da))
return self.transport.request(types.Command.DBG_WRITE, *d)
@wrapped
def dbgWriteNext(self, num: int, data: int):
""""""
d = bytearray()
d.extend(b"\x00")
d.extend(self.WORD_pack(num))
d.extend(b"\x00\x00")
for i in range(num):
if self._dbg_width == 0x01:
d.extend(self.BYTE_pack(data[i]))
elif self._dbg_width == 0x02:
d.extend(self.WORD_pack(data[i]))
elif self._dbg_width == 0x04:
d.extend(self.DWORD_pack(data[i]))
elif self._dbg_width == 0x08:
d.extend(self.DLONG_pack(data[i]))
return self.transport.request(types.Command.DBG_WRITE_NEXT, *d)
@wrapped
def dbgWriteCan1(self, tri: int, address: int):
""""""
d = bytearray()
d.extend(self.BYTE_pack(tri))
d.extend(self.DWORD_pack(address))
return self.transport.request(types.Command.DBG_WRITE_CAN1, *d)
@wrapped
def dbgWriteCan2(self, width: int, num: int):
""""""
d = bytearray()
self._dbg_width = width
d.append(width)
d.extend(self.BYTE_pack(num))
return self.transport.request(types.Command.DBG_WRITE_CAN2, *d)
@wrapped
def dbgWriteCanNext(self, num: int, data: int):
""""""
d = bytearray()
d.extend(self.BYTE_pack(num))
for i in range(num):
if self._dbg_width == 0x01:
d.extend(self.BYTE_pack(data[i]))
elif self._dbg_width == 0x02:
d.extend(self.WORD_pack(data[i]))
elif self._dbg_width == 0x04:
d.extend(self.DWORD_pack(data[i]))
elif self._dbg_width == 0x08:
d.extend(self.DLONG_pack(data[i]))
return self.transport.request(types.Command.DBG_WRITE_CAN_NEXT, *d)
@wrapped
def dbgRead(self, tri: int, width: int, num: int, address: int):
""""""
d = bytearray()
d.extend(b"\x00")
d.extend(self.BYTE_pack(tri))
self._dbg_width = width
d.extend(self.BYTE_pack(width))
d.extend(self.WORD_pack(num))
d.extend(self.DLONG_pack(address))
response = self.transport.request(types.Command.DBG_READ, *d)
return types.DbgReadResponse.parse(response, byteOrder=self.slaveProperties.byteOrder, width=width)
@wrapped
def dbgReadCan1(self, tri: int, address: int):
""""""
d = bytearray()
d.extend(self.BYTE_pack(tri))
d.extend(self.DWORD_pack(address))
return self.transport.request(types.Command.DBG_READ_CAN1, *d)
@wrapped
def dbgReadCan2(self, width: int, num: int):
""""""
d = bytearray()
self._dbg_width = width
d.extend(self.BYTE_pack(width))
d.extend(self.BYTE_pack(num))
return self.transport.request(types.Command.DBG_READ_CAN2, *d)
@wrapped
def dbgGetTriDescTbl(self):
""""""
response = self.transport.request(types.Command.DBG_GET_TRI_DESC_TBL, b"\x00\x00\x00\x00\x00")
return types.DbgGetTriDescTblResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def dbgLlbt(self, data):
""""""
d = bytearray()
d.extend(b"\x00")
d.extend(self.WORD_pack(len(data)))
for b in data:
d.extend(self.BYTE_pack(b))
response = self.transport.request(types.Command.DBG_LLBT, d)
return types.DbgLlbtResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
@wrapped
def timeCorrelationProperties(self, setProperties, getPropertiesRequest, clusterId):
response = self.transport.request(
types.Command.TIME_CORRELATION_PROPERTIES, setProperties, getPropertiesRequest, 0, *self.WORD_pack(clusterId)
)
return types.TimeCorrelationPropertiesResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
# Transport layer commands / CAN.
@broadcasted
@wrapped
def getSlaveID(self, mode: int):
self.transportLayerCmd(types.TransportLayerCommands.GET_SLAVE_ID, "X", "C", "P", mode)
def getDaqId(self, daqListNumber: int):
response = self.transportLayerCmd(types.TransportLayerCommands.GET_DAQ_ID, *self.WORD_pack(daqListNumber))
if response:
return types.GetDaqIdResponse.parse(response, byteOrder=self.slaveProperties.byteOrder)
def setDaqId(self, daqListNumber: int, identifier: int):
response = self.transportLayerCmd(
types.TransportLayerCommands.SET_DAQ_ID, *self.WORD_pack(daqListNumber), *self.DWORD_pack(identifier)
)
return response
# Convenience Functions.
def verify(self, addr, length):
"""Convenience function for verification of a data-transfer from slave
to master (Not part of the XCP Specification).
Parameters
----------
addr : int
length : int
Returns
-------
bool
"""
self.setMta(addr)
cs = self.buildChecksum(length)
self.logger.debug("BuildChecksum return'd: 0x{:08X} [{}]".format(cs.checksum, cs.checksumType))
self.setMta(addr)
data = self.fetch(length)
cc = checksum.check(data, cs.checksumType)
self.logger.debug("Our checksum : 0x{:08X}".format(cc))
return cs.checksum == cc
def getDaqInfo(self):
"""Get DAQ information: processor, resolution, events."""
result = {}
dpi = self.getDaqProcessorInfo()
processorInfo = {
"minDaq": dpi["minDaq"],
"maxDaq": dpi["maxDaq"],
"properties": {
"configType": dpi["daqProperties"]["daqConfigType"],
"overloadEvent": dpi["daqProperties"]["overloadEvent"],
"overloadMsb": dpi["daqProperties"]["overloadMsb"],
"prescalerSupported": dpi["daqProperties"]["prescalerSupported"],
"pidOffSupported": dpi["daqProperties"]["pidOffSupported"],
"timestampSupported": dpi["daqProperties"]["timestampSupported"],
"bitStimSupported": dpi["daqProperties"]["bitStimSupported"],
"resumeSupported": dpi["daqProperties"]["resumeSupported"],
},
"keyByte": {
"identificationField": dpi["daqKeyByte"]["Identification_Field"],
"addressExtension": dpi["daqKeyByte"]["Address_Extension"],
"optimisationType": dpi["daqKeyByte"]["Optimisation_Type"],
},
}
result["processor"] = processorInfo
dri = self.getDaqResolutionInfo()
resolutionInfo = {
"timestampTicks": dri["timestampTicks"],
"maxOdtEntrySizeDaq": dri["maxOdtEntrySizeDaq"],
"maxOdtEntrySizeStim": dri["maxOdtEntrySizeStim"],
"granularityOdtEntrySizeDaq": dri["granularityOdtEntrySizeDaq"],
"granularityOdtEntrySizeStim": dri["granularityOdtEntrySizeStim"],
"timestampMode": {
"unit": dri["timestampMode"]["unit"],
"fixed": dri["timestampMode"]["fixed"],
"size": dri["timestampMode"]["size"],
},
}
result["resolution"] = resolutionInfo
channels = []
for ecn in range(dpi.maxEventChannel):
eci = self.getDaqEventInfo(ecn)
name = self.fetch(eci.eventChannelNameLength)
if name:
name = decode_bytes(name)
channel = {
"name": name,
"priority": eci["eventChannelPriority"],
"unit": eci["eventChannelTimeUnit"],
"cycle": eci["eventChannelTimeCycle"],
"maxDaqList": eci["maxDaqList"],
"properties": {
"consistency": eci["daqEventProperties"]["consistency"],
"daq": eci["daqEventProperties"]["daq"],
"stim": eci["daqEventProperties"]["stim"],
"packed": eci["daqEventProperties"]["packed"],
},
}
channels.append(channel)
result["channels"] = channels
return result
def getCurrentProtectionStatus(self):
""""""
if self.currentProtectionStatus is None:
status = self.getStatus()
self._setProtectionStatus(status.resourceProtectionStatus)
return self.currentProtectionStatus
def _setProtectionStatus(self, protection):
""""""
self.currentProtectionStatus = {
"dbg": protection.dbg,
"pgm": protection.pgm,
"stim": protection.stim,
"daq": protection.daq,
"calpag": protection.calpag,
}
def cond_unlock(self, resources=None):
"""Conditionally unlock resources, i.e. only unlock locked resources.
Precondition: Parameter "SEED_N_KEY_DLL" must be present and point to a valid DLL/SO.
Parameters
----------
resources: str
Comma or space separated list of resources, e.g. "DAQ, CALPAG".
The names are not case-sensitive.
Valid identifiers are: "calpag", "daq", "dbg", "pgm", "stim".
If omitted, try to unlock every available resource.
Raises
------
ValueError
Invalid resource name.
`dllif.SeedNKeyError`
In case of DLL related issues.
"""
import re
from pyxcp.dllif import getKey, SeedNKeyResult, SeedNKeyError
MAX_PAYLOAD = self.slaveProperties["maxCto"] - 2
if not self.seedNKeyDLL:
raise RuntimeError("No seed and key DLL specified, cannot proceed.")
if resources is None:
result = []
if self.slaveProperties["supportsCalpag"]:
result.append("calpag")
if self.slaveProperties["supportsDaq"]:
result.append("daq")
if self.slaveProperties["supportsStim"]:
result.append("stim")
if self.slaveProperties["supportsPgm"]:
result.append("pgm")
resources = ",".join(result)
protection_status = self.getCurrentProtectionStatus()
resource_names = [r.lower() for r in re.split(r"[ ,]", resources) if r]
for name in resource_names:
if name not in types.RESOURCE_VALUES:
raise ValueError("Invalid resource name '{}'.".format(name))
if not protection_status[name]:
continue
resource_value = types.RESOURCE_VALUES[name]
result = self.getSeed(types.XcpGetSeedMode.FIRST_PART, resource_value)
seed = list(result.seed)
length = result.length
if length == 0:
continue
while length - len(seed) > 0:
result = self.getSeed(types.XcpGetSeedMode.REMAINING, resource_value)
seed.extend(list(result.seed))
seed = seed[:length] # maybe there are some padding bytes
result, key = getKey(
self.logger,
self.seedNKeyDLL,
resource_value,
bytes(seed),
self.seedNKeyDLL_same_bit_width,
)
if result == SeedNKeyResult.ACK:
key = list(key)
total_length = len(key)
offset = 0
while offset < total_length:
data = key[offset : offset + MAX_PAYLOAD]
self.unlock(total_length - offset, data)
offset += len(data)
else:
raise SeedNKeyError("SeedAndKey DLL returned: {}".format(SeedNKeyResult(result).name))
def identifier(self, id_value: int) -> str:
"""Return the identifier for the given value.
Use this method instead of calling `getId()` directly.
Parameters
----------
id_value: int
For standard identifiers, use the constants from `pyxcp.types.XcpGetIdType`.
Returns
-------
str
"""
gid = self.getId(id_value)
if (gid.mode & 0x01) == 0x01:
value = bytes(gid.identification or b"")
else:
value = self.fetch(gid.length)
return decode_bytes(value)
def id_scanner(self, scan_ranges: Optional[Collection[Collection[int]]] = None) -> Dict[str, str]:
"""Scan for available standard identification types (GET_ID).
Parameters
----------
scan_ranges: Optional[Collection[Collection[int]]]
- If parameter is omitted or `None` test every standard identification type (s. GET_ID service)
plus extensions by Vector Informatik.
- Else `scan_ranges` must be a list-of-list.
e.g: [[12, 80], [123], [240, 16, 35]]
- The first list is a range (closed interval).
- The second is a single value.
- The third is a value list.
Returns
-------
Dict[str, str]
"""
result = {}
def make_generator(sr):
STD_IDS = {int(v): k for k, v in types.XcpGetIdType.__members__.items()}
if sr is None:
scan_range = STD_IDS.keys()
else:
scan_range = []
if not isinstance(sr, Collection):
raise TypeError("scan_ranges must be of type `Collection`")
for element in sr:
if not isinstance(element, Collection):
raise TypeError("scan_ranges elements must be of type `Collection`")
if not element:
raise ValueError("scan_ranges elements cannot be empty")
if len(element) == 1:
scan_range.append(element[0]) # Single value
elif len(element) == 2:
start, stop = element # Value range
scan_range.extend(list(range(start, stop + 1)))
else:
scan_range.extend(element) # Value list.
scan_range = sorted(frozenset(scan_range))
def generate():
for idx, id_value in enumerate(scan_range):
if id_value in STD_IDS:
name = STD_IDS[id_value]
else:
name = f"USER_{idx}"
yield id_value, name,
return generate()
gen = make_generator(scan_ranges)
for id_value, name in gen:
response = b""
try:
response = self.identifier(id_value)
except types.XcpResponseError:
# don't depend on confirming implementation, i.e.: ID not implemented ==> empty response.
pass
except Exception:
raise
if response:
result[name] = response
return result
def try_command(self, cmd: Callable, *args, **kws) -> Tuple[types.TryCommandResult, Any]:
"""Call master functions and handle XCP errors more gracefuly.
Parameter
---------
cmd: Callable
args: list
variable length arguments to `cmd`.
kws: dict
keyword arguments to `cmd`.
`extra_msg`: str
Additional info to log message (not passed to `cmd`).
Returns
-------
Note
----
Mainly used for plug-and-play applications, e.g. `id_scanner` may confronted with `ERR_OUT_OF_RANGE` errors, which
is normal for this kind of applications -- or to test for optional commands.
Use carefuly not to hide serious error causes.
"""
try:
extra_msg: Optional[str] = kws.get("extra_msg")
if extra_msg:
kws.pop("extra_msg")
res = cmd(*args, **kws)
except SystemExit as e:
if e.error_code == types.XcpError.ERR_CMD_UNKNOWN:
# This is a rather common use-case, so let the user know that there is some functionality missing.
if extra_msg:
self.logger.warning(f"Optional command {cmd.__name__!r} not implemented -- {extra_msg!r}")
else:
self.logger.warning(f"Optional command {cmd.__name__!r} not implemented.")
return (types.TryCommandResult.XCP_ERROR, e)
except Exception as e:
return (types.TryCommandResult.OTHER_ERROR, e)
else:
return (types.TryCommandResult.OK, res)
def ticks_to_seconds(ticks, resolution):
"""Convert DAQ timestamp/tick value to seconds.
Parameters
----------
ticks: int
unit: `GetDaqResolutionInfoResponse` as returned by :meth:`getDaqResolutionInfo`
"""
warnings.warn(
"ticks_to_seconds() deprecated, use factory :func:`make_tick_converter` instead.",
Warning,
)
return (10 ** types.DAQ_TIMESTAMP_UNIT_TO_EXP[resolution.timestampMode.unit]) * resolution.timestampTicks * ticks
def make_tick_converter(resolution):
"""Make a function that converts tick count from XCP slave to seconds.
Parameters
----------
resolution: `GetDaqResolutionInfoResponse` as returned by :meth:`getDaqResolutionInfo`
"""
exponent = types.DAQ_TIMESTAMP_UNIT_TO_EXP[resolution.timestampMode.unit]
tick_resolution = resolution.timestampTicks
base = (10**exponent) * tick_resolution
def ticks_to_seconds(ticks):
"""Convert DAQ timestamp/tick value to seconds.
Parameters
----------
ticks: int
Returns
-------
float
"""
return base * ticks
return ticks_to_seconds