example/django_example/consumer.py
from django.core.serializers.json import DjangoJSONEncoder
from channels_jsonrpc import JsonRpcConsumerTest
# import the logging library
import logging
# Get an instance of a logger
logger = logging.getLogger(__name__)
class MyJsonRpcWebsocketConsumerTest(JsonRpcConsumerTest):
# Set to True if you want them, else leave out
strict_ordering = False
slight_ordering = False
# Set to True to automatically port users from HTTP cookies
# (you don't need channel_session_user, this implies it)
# https://channels.readthedocs.io/en/stable/generics.html#websockets
http_user = True
def connection_groups(self, **kwargs):
"""
Called to return the list of groups to automatically add/remove
this connection to/from.
"""
return ["test"]
def connect(self, message, **kwargs):
"""
Perform things on connection start
"""
self.message.reply_channel.send({"accept": True})
logger.info("connect")
# Do stuff if needed
def disconnect(self, message, **kwargs):
"""
Perform things on connection close
"""
logger.info("disconnect")
# Do stuff if needed
def process(cls, data, original_msg):
"""
Made to test thread-safe
:param data:
:param original_msg:
:return:
"""
return cls.__process(data, original_msg)
@MyJsonRpcWebsocketConsumerTest.rpc_method()
def ping(fake_an_error, **kwargs):
if fake_an_error:
# Will return an error to the client
# --> {"id":1, "jsonrpc":"2.0","method":"mymodule.rpc.ping","params":{}}
# <-- {"id": 1, "jsonrpc": "2.0", "error": {"message": "fake_error", "code": -32000, "data": ["fake_error"]}}
raise Exception(False)
else:
# Will return a result to the client
# --> {"id":1, "jsonrpc":"2.0","method":"mymodule.rpc.ping","params":{}}
# <-- {"id": 1, "jsonrpc": "2.0", "result": "pong"}
return "pong"
class DjangoJsonRpcWebsocketConsumerTest(JsonRpcConsumerTest):
json_encoder_class = DjangoJSONEncoder