src/SimplePbjx.php
<?php
declare(strict_types=1);
namespace Gdbots\Pbjx;
use Gdbots\Pbj\Message;
use Gdbots\Pbj\Schema;
use Gdbots\Pbj\Util\NumberUtil;
use Gdbots\Pbjx\Event\BusExceptionEvent;
use Gdbots\Pbjx\Event\GetResponseEvent;
use Gdbots\Pbjx\Event\PbjxEvent;
use Gdbots\Pbjx\Event\ResponseCreatedEvent;
use Gdbots\Pbjx\EventSearch\EventSearch;
use Gdbots\Pbjx\EventStore\EventStore;
use Gdbots\Pbjx\Exception\InvalidArgumentException;
use Gdbots\Pbjx\Exception\LogicException;
use Gdbots\Pbjx\Exception\RequestHandlingFailed;
use Gdbots\Pbjx\Exception\TooMuchRecursion;
use Gdbots\Schemas\Pbjx\Request\RequestFailedResponseV1;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
final class SimplePbjx implements Pbjx
{
private EventDispatcherInterface $dispatcher;
private ServiceLocator $locator;
private int $maxRecursion;
public function __construct(ServiceLocator $locator, int $maxRecursion = 10)
{
$this->locator = $locator;
$this->dispatcher = $this->locator->getDispatcher();
$this->maxRecursion = NumberUtil::bound($maxRecursion, 2, 10);
PbjxEvent::setPbjx($this);
}
public function trigger(Message $message, string $suffix, ?PbjxEvent $event = null, bool $recursive = true, bool $throw = true): static
{
$suffix = '.' . trim($suffix, '.');
if ('.' === $suffix) {
throw new InvalidArgumentException('Trigger requires a non-empty suffix.');
}
$event = $event ?: new PbjxEvent($message);
$schema = $message::schema();
if ($event->getDepth() > $this->maxRecursion) {
throw new TooMuchRecursion(sprintf(
'Pbjx::trigger encountered a schema that is too complex ' .
'or a nested message is being referenced multiple times in ' .
'the same tree. Max recursion: %d, Current schema is "%s".',
$this->maxRecursion,
$schema->getId()->toString()
));
}
if ($recursive && $event->supportsRecursion()) {
foreach ($this->getNestedMessages($message, $schema) as $nestedMessage) {
if ($nestedMessage->isFrozen()) {
continue;
}
$this->trigger($nestedMessage, $suffix, $event->createChildEvent($nestedMessage), $recursive);
}
}
$eventNames = ['gdbots_pbjx.message' . $suffix];
foreach ($schema->getMixins() as $mixin) {
$eventNames[] = $mixin . $suffix;
}
$eventNames[] = $schema->getCurieMajor() . $suffix;
$eventNames[] = $schema->getCurie()->toString() . $suffix;
foreach ($eventNames as $eventName) {
try {
$this->dispatcher->dispatch($event, $eventName);
} catch (\Throwable $e) {
if ($throw) {
throw $e;
}
$this->locator->getExceptionHandler()->onTriggerException($event, $eventName, $e);
}
}
return $this;
}
public function triggerLifecycle(Message $message, bool $recursive = true): static
{
if ($message->isFrozen()) {
return $this;
}
$event = new PbjxEvent($message);
$this->trigger($message, PbjxEvents::SUFFIX_BIND, $event, $recursive);
$this->trigger($message, PbjxEvents::SUFFIX_VALIDATE, $event, $recursive);
$this->trigger($message, PbjxEvents::SUFFIX_ENRICH, $event, $recursive);
return $this;
}
public function copyContext(Message $from, Message $to): static
{
if ($to->isFrozen()) {
return $this;
}
$schema = $to::schema();
if (!$to->has('ctx_causator_ref') && $schema->hasField('ctx_causator_ref')) {
$to->set('ctx_causator_ref', $from->generateMessageRef());
}
$clone = ['ctx_app', 'ctx_cloud'];
foreach ($clone as $field) {
if (!$to->has($field) && $from->has($field) && $schema->hasField($field)) {
$to->set($field, clone $from->get($field));
}
}
$simple = [
'ctx_tenant_id',
'ctx_correlator_ref',
'ctx_user_ref',
'ctx_ip',
'ctx_ipv6',
'ctx_ua',
'ctx_msg',
];
foreach ($simple as $field) {
if (!$to->has($field) && $from->has($field) && $schema->hasField($field)) {
$to->set($field, $from->get($field));
}
}
return $this;
}
public function send(Message $command): void
{
if (!$command::schema()->hasMixin('gdbots:pbjx:mixin:command')) {
throw new LogicException('Pbjx->send requires a message using "gdbots:pbjx:mixin:command".');
}
$this->triggerLifecycle($command);
$this->locator->getCommandBus()->send($command);
}
public function sendAt(Message $command, int $timestamp, ?string $jobId = null, array $context = []): string
{
if ($timestamp <= time()) {
throw new LogicException('Pbjx->sendAt requires a timestamp in the future.');
}
if (!$command::schema()->hasMixin('gdbots:pbjx:mixin:command')) {
throw new LogicException('Pbjx->sendAt requires a message using "gdbots:pbjx:mixin:command".');
}
$this->triggerLifecycle($command);
$command->freeze();
return $this->locator->getScheduler()->sendAt($command, $timestamp, $jobId, $context);
}
public function cancelJobs(array $jobIds, array $context = []): void
{
$this->locator->getScheduler()->cancelJobs($jobIds, $context);
}
public function publish(Message $event): void
{
if (!$event::schema()->hasMixin('gdbots:pbjx:mixin:event')) {
throw new LogicException('Pbjx->publish requires a message using "gdbots:pbjx:mixin:event".');
}
$this->triggerLifecycle($event);
$this->locator->getEventBus()->publish($event);
}
public function request(Message $request): Message
{
if (!$request::schema()->hasMixin('gdbots:pbjx:mixin:request')) {
throw new LogicException('Pbjx->request requires a message using "gdbots:pbjx:mixin:request".');
}
$this->triggerLifecycle($request);
$event = new GetResponseEvent($request);
$this->trigger($request, PbjxEvents::SUFFIX_BEFORE_HANDLE, $event, false);
if ($event->hasResponse()) {
return $event->getResponse();
}
$response = $this->locator->getRequestBus()->request($request);
$event->setResponse($response);
if ($response instanceof RequestFailedResponseV1) {
throw new RequestHandlingFailed($response);
}
try {
$event = new ResponseCreatedEvent($request, $response);
$this->trigger($request, PbjxEvents::SUFFIX_AFTER_HANDLE, $event, false);
$this->trigger($response, PbjxEvents::SUFFIX_CREATED, $event, false);
} catch (\Throwable $e) {
$this->locator->getExceptionHandler()->onRequestBusException(new BusExceptionEvent($response, $e));
throw $e;
}
return $response;
}
public function getEventStore(): EventStore
{
return $this->locator->getEventStore();
}
public function getEventSearch(): EventSearch
{
return $this->locator->getEventSearch();
}
/**
* @param Message $message
* @param Schema $schema
*
* @return Message[]
*/
private function getNestedMessages(Message $message, Schema $schema): array
{
$messages = [];
foreach ($schema->getFields() as $field) {
if (!$field->getType()->isMessage()) {
continue;
}
if (!$message->has($field->getName())) {
continue;
}
if ($field->isASingleValue()) {
$messages[] = $message->fget($field->getName());
} else {
$messages = array_merge($messages, $message->fget($field->getName()));
}
}
return $messages;
}
}