enclose-io/compiler

View on GitHub
lts/src/node_messaging.cc

Summary

Maintainability
Test Coverage
#include "node_messaging.h"

#include "async_wrap-inl.h"
#include "debug_utils-inl.h"
#include "memory_tracker-inl.h"
#include "node_contextify.h"
#include "node_buffer.h"
#include "node_errors.h"
#include "node_process.h"
#include "util-inl.h"

using node::contextify::ContextifyContext;
using v8::Array;
using v8::ArrayBuffer;
using v8::ArrayBufferCreationMode;
using v8::Context;
using v8::EscapableHandleScope;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Global;
using v8::HandleScope;
using v8::Isolate;
using v8::Just;
using v8::Local;
using v8::Maybe;
using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::SharedArrayBuffer;
using v8::String;
using v8::Symbol;
using v8::Value;
using v8::ValueDeserializer;
using v8::ValueSerializer;
using v8::WasmModuleObject;

namespace node {
namespace worker {

Message::Message(MallocedBuffer<char>&& buffer)
    : main_message_buf_(std::move(buffer)) {}

bool Message::IsCloseMessage() const {
  return main_message_buf_.data == nullptr;
}

namespace {

// This is used to tell V8 how to read transferred host objects, like other
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
class DeserializerDelegate : public ValueDeserializer::Delegate {
 public:
  DeserializerDelegate(
      Message* m,
      Environment* env,
      const std::vector<MessagePort*>& message_ports,
      const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers,
      const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules)
      : message_ports_(message_ports),
        shared_array_buffers_(shared_array_buffers),
        wasm_modules_(wasm_modules) {}

  MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
    // Currently, only MessagePort hosts objects are supported, so identifying
    // by the index in the message's MessagePort array is sufficient.
    uint32_t id;
    if (!deserializer->ReadUint32(&id))
      return MaybeLocal<Object>();
    CHECK_LE(id, message_ports_.size());
    return message_ports_[id]->object(isolate);
  }

  MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
      Isolate* isolate, uint32_t clone_id) override {
    CHECK_LE(clone_id, shared_array_buffers_.size());
    return shared_array_buffers_[clone_id];
  }

  MaybeLocal<WasmModuleObject> GetWasmModuleFromId(
      Isolate* isolate, uint32_t transfer_id) override {
    CHECK_LE(transfer_id, wasm_modules_.size());
    return WasmModuleObject::FromTransferrableModule(
        isolate, wasm_modules_[transfer_id]);
  }

  ValueDeserializer* deserializer = nullptr;

 private:
  const std::vector<MessagePort*>& message_ports_;
  const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
  const std::vector<WasmModuleObject::TransferrableModule>& wasm_modules_;
};

}  // anonymous namespace

MaybeLocal<Value> Message::Deserialize(Environment* env,
                                       Local<Context> context) {
  CHECK(!IsCloseMessage());

  EscapableHandleScope handle_scope(env->isolate());
  Context::Scope context_scope(context);

  // Create all necessary MessagePort handles.
  std::vector<MessagePort*> ports(message_ports_.size());
  for (uint32_t i = 0; i < message_ports_.size(); ++i) {
    ports[i] = MessagePort::New(env,
                                context,
                                std::move(message_ports_[i]));
    if (ports[i] == nullptr) {
      for (MessagePort* port : ports) {
        // This will eventually release the MessagePort object itself.
        if (port != nullptr)
          port->Close();
      }
      return MaybeLocal<Value>();
    }
  }
  message_ports_.clear();

  std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
  // Attach all transferred SharedArrayBuffers to their new Isolate.
  for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
    Local<SharedArrayBuffer> sab;
    if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
            .ToLocal(&sab))
      return MaybeLocal<Value>();
    shared_array_buffers.push_back(sab);
  }
  shared_array_buffers_.clear();

  DeserializerDelegate delegate(
      this, env, ports, shared_array_buffers, wasm_modules_);
  ValueDeserializer deserializer(
      env->isolate(),
      reinterpret_cast<const uint8_t*>(main_message_buf_.data),
      main_message_buf_.size,
      &delegate);
  delegate.deserializer = &deserializer;

  // Attach all transferred ArrayBuffers to their new Isolate.
  for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) {
    if (!env->isolate_data()->uses_node_allocator()) {
      // We don't use Node's allocator on the receiving side, so we have
      // to create the ArrayBuffer from a copy of the memory.
      AllocatedBuffer buf =
          env->AllocateManaged(array_buffer_contents_[i].size);
      memcpy(buf.data(),
             array_buffer_contents_[i].data,
             array_buffer_contents_[i].size);
      deserializer.TransferArrayBuffer(i, buf.ToArrayBuffer());
      continue;
    }

    env->isolate_data()->node_allocator()->RegisterPointer(
        array_buffer_contents_[i].data, array_buffer_contents_[i].size);

    Local<ArrayBuffer> ab =
        ArrayBuffer::New(env->isolate(),
                         array_buffer_contents_[i].release(),
                         array_buffer_contents_[i].size,
                         ArrayBufferCreationMode::kInternalized);
    deserializer.TransferArrayBuffer(i, ab);
  }
  array_buffer_contents_.clear();

  if (deserializer.ReadHeader(context).IsNothing())
    return MaybeLocal<Value>();
  return handle_scope.Escape(
      deserializer.ReadValue(context).FromMaybe(Local<Value>()));
}

void Message::AddSharedArrayBuffer(
    const SharedArrayBufferMetadataReference& reference) {
  shared_array_buffers_.push_back(reference);
}

void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
  message_ports_.emplace_back(std::move(data));
}

uint32_t Message::AddWASMModule(WasmModuleObject::TransferrableModule&& mod) {
  wasm_modules_.emplace_back(std::move(mod));
  return wasm_modules_.size() - 1;
}

namespace {

MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
  Isolate* isolate = context->GetIsolate();
  Local<Object> per_context_bindings;
  Local<Value> emit_message_val;
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
      !per_context_bindings->Get(context,
                                FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
          .ToLocal(&emit_message_val)) {
    return MaybeLocal<Function>();
  }
  CHECK(emit_message_val->IsFunction());
  return emit_message_val.As<Function>();
}

MaybeLocal<Function> GetDOMException(Local<Context> context) {
  Isolate* isolate = context->GetIsolate();
  Local<Object> per_context_bindings;
  Local<Value> domexception_ctor_val;
  if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
      !per_context_bindings->Get(context,
                                FIXED_ONE_BYTE_STRING(isolate, "DOMException"))
          .ToLocal(&domexception_ctor_val)) {
    return MaybeLocal<Function>();
  }
  CHECK(domexception_ctor_val->IsFunction());
  Local<Function> domexception_ctor = domexception_ctor_val.As<Function>();
  return domexception_ctor;
}

void ThrowDataCloneException(Local<Context> context, Local<String> message) {
  Isolate* isolate = context->GetIsolate();
  Local<Value> argv[] = {message,
                         FIXED_ONE_BYTE_STRING(isolate, "DataCloneError")};
  Local<Value> exception;
  Local<Function> domexception_ctor;
  if (!GetDOMException(context).ToLocal(&domexception_ctor) ||
      !domexception_ctor->NewInstance(context, arraysize(argv), argv)
           .ToLocal(&exception)) {
    return;
  }
  isolate->ThrowException(exception);
}

// This tells V8 how to serialize objects that it does not understand
// (e.g. C++ objects) into the output buffer, in a way that our own
// DeserializerDelegate understands how to unpack.
class SerializerDelegate : public ValueSerializer::Delegate {
 public:
  SerializerDelegate(Environment* env, Local<Context> context, Message* m)
      : env_(env), context_(context), msg_(m) {}

  void ThrowDataCloneError(Local<String> message) override {
    ThrowDataCloneException(context_, message);
  }

  Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
    if (env_->message_port_constructor_template()->HasInstance(object)) {
      return WriteMessagePort(Unwrap<MessagePort>(object));
    }

    ThrowDataCloneError(env_->clone_unsupported_type_str());
    return Nothing<bool>();
  }

  Maybe<uint32_t> GetSharedArrayBufferId(
      Isolate* isolate,
      Local<SharedArrayBuffer> shared_array_buffer) override {
    uint32_t i;
    for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
      if (PersistentToLocal::Strong(seen_shared_array_buffers_[i]) ==
          shared_array_buffer) {
        return Just(i);
      }
    }

    auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
        env_,
        context_,
        shared_array_buffer);
    if (!reference) {
      return Nothing<uint32_t>();
    }
    seen_shared_array_buffers_.emplace_back(
      Global<SharedArrayBuffer> { isolate, shared_array_buffer });
    msg_->AddSharedArrayBuffer(reference);
    return Just(i);
  }

  Maybe<uint32_t> GetWasmModuleTransferId(
      Isolate* isolate, Local<WasmModuleObject> module) override {
    return Just(msg_->AddWASMModule(module->GetTransferrableModule()));
  }

  void Finish() {
    // Only close the MessagePort handles and actually transfer them
    // once we know that serialization succeeded.
    for (MessagePort* port : ports_) {
      port->Close();
      msg_->AddMessagePort(port->Detach());
    }
  }

  ValueSerializer* serializer = nullptr;

 private:
  Maybe<bool> WriteMessagePort(MessagePort* port) {
    for (uint32_t i = 0; i < ports_.size(); i++) {
      if (ports_[i] == port) {
        serializer->WriteUint32(i);
        return Just(true);
      }
    }

    THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
    return Nothing<bool>();
  }

  Environment* env_;
  Local<Context> context_;
  Message* msg_;
  std::vector<Global<SharedArrayBuffer>> seen_shared_array_buffers_;
  std::vector<MessagePort*> ports_;

  friend class worker::Message;
};

}  // anonymous namespace

Maybe<bool> Message::Serialize(Environment* env,
                               Local<Context> context,
                               Local<Value> input,
                               const TransferList& transfer_list_v,
                               Local<Object> source_port) {
  HandleScope handle_scope(env->isolate());
  Context::Scope context_scope(context);

  // Verify that we're not silently overwriting an existing message.
  CHECK(main_message_buf_.is_empty());

  SerializerDelegate delegate(env, context, this);
  ValueSerializer serializer(env->isolate(), &delegate);
  delegate.serializer = &serializer;

  std::vector<Local<ArrayBuffer>> array_buffers;
  for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
    Local<Value> entry = transfer_list_v[i];
    // Currently, we support ArrayBuffers and MessagePorts.
    if (entry->IsArrayBuffer()) {
      Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
      // If we cannot render the ArrayBuffer unusable in this Isolate and
      // take ownership of its memory, copying the buffer will have to do.
      if (!ab->IsDetachable() || ab->IsExternal() ||
          !env->isolate_data()->uses_node_allocator()) {
        continue;
      }
      // See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
      // for details.
      bool untransferrable;
      if (!ab->HasPrivate(
              context,
              env->arraybuffer_untransferable_private_symbol())
              .To(&untransferrable)) {
        return Nothing<bool>();
      }
      if (untransferrable) continue;
      if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
          array_buffers.end()) {
        ThrowDataCloneException(
            context,
            FIXED_ONE_BYTE_STRING(
                env->isolate(),
                "Transfer list contains duplicate ArrayBuffer"));
        return Nothing<bool>();
      }
      // We simply use the array index in the `array_buffers` list as the
      // ID that we write into the serialized buffer.
      uint32_t id = array_buffers.size();
      array_buffers.push_back(ab);
      serializer.TransferArrayBuffer(id, ab);
      continue;
    } else if (env->message_port_constructor_template()
                  ->HasInstance(entry)) {
      // Check if the source MessagePort is being transferred.
      if (!source_port.IsEmpty() && entry == source_port) {
        ThrowDataCloneException(
            context,
            FIXED_ONE_BYTE_STRING(env->isolate(),
                                  "Transfer list contains source port"));
        return Nothing<bool>();
      }
      MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
      if (port == nullptr || port->IsDetached()) {
        ThrowDataCloneException(
            context,
            FIXED_ONE_BYTE_STRING(
                env->isolate(),
                "MessagePort in transfer list is already detached"));
        return Nothing<bool>();
      }
      if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
          delegate.ports_.end()) {
        ThrowDataCloneException(
            context,
            FIXED_ONE_BYTE_STRING(
                env->isolate(),
                "Transfer list contains duplicate MessagePort"));
        return Nothing<bool>();
      }
      delegate.ports_.push_back(port);
      continue;
    }

    THROW_ERR_INVALID_TRANSFER_OBJECT(env);
    return Nothing<bool>();
  }

  serializer.WriteHeader();
  if (serializer.WriteValue(context, input).IsNothing()) {
    return Nothing<bool>();
  }

  for (Local<ArrayBuffer> ab : array_buffers) {
    // If serialization succeeded, we want to take ownership of
    // (a.k.a. externalize) the underlying memory region and render
    // it inaccessible in this Isolate.
    ArrayBuffer::Contents contents = ab->Externalize();
    ab->Detach();

    CHECK(env->isolate_data()->uses_node_allocator());
    env->isolate_data()->node_allocator()->UnregisterPointer(
        contents.Data(), contents.ByteLength());

    array_buffer_contents_.emplace_back(MallocedBuffer<char>{
        static_cast<char*>(contents.Data()), contents.ByteLength()});
  }

  delegate.Finish();

  // The serializer gave us a buffer allocated using `malloc()`.
  std::pair<uint8_t*, size_t> data = serializer.Release();
  CHECK_NOT_NULL(data.first);
  main_message_buf_ =
      MallocedBuffer<char>(reinterpret_cast<char*>(data.first), data.second);
  return Just(true);
}

void Message::MemoryInfo(MemoryTracker* tracker) const {
  tracker->TrackField("array_buffer_contents", array_buffer_contents_);
  tracker->TrackFieldWithSize("shared_array_buffers",
      shared_array_buffers_.size() * sizeof(shared_array_buffers_[0]));
  tracker->TrackField("message_ports", message_ports_);
}

MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }

MessagePortData::~MessagePortData() {
  CHECK_NULL(owner_);
  Disentangle();
}

void MessagePortData::MemoryInfo(MemoryTracker* tracker) const {
  Mutex::ScopedLock lock(mutex_);
  tracker->TrackField("incoming_messages", incoming_messages_);
}

void MessagePortData::AddToIncomingQueue(Message&& message) {
  // This function will be called by other threads.
  Mutex::ScopedLock lock(mutex_);
  incoming_messages_.emplace_back(std::move(message));

  if (owner_ != nullptr) {
    Debug(owner_, "Adding message to incoming queue");
    owner_->TriggerAsync();
  }
}

void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
  CHECK_NULL(a->sibling_);
  CHECK_NULL(b->sibling_);
  a->sibling_ = b;
  b->sibling_ = a;
  a->sibling_mutex_ = b->sibling_mutex_;
}

void MessagePortData::Disentangle() {
  // Grab a copy of the sibling mutex, then replace it so that each sibling
  // has its own sibling_mutex_ now.
  std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
  Mutex::ScopedLock sibling_lock(*sibling_mutex);
  sibling_mutex_ = std::make_shared<Mutex>();

  MessagePortData* sibling = sibling_;
  if (sibling_ != nullptr) {
    sibling_->sibling_ = nullptr;
    sibling_ = nullptr;
  }

  // We close MessagePorts after disentanglement, so we enqueue a corresponding
  // message and trigger the corresponding uv_async_t to let them know that
  // this happened.
  AddToIncomingQueue(Message());
  if (sibling != nullptr) {
    sibling->AddToIncomingQueue(Message());
  }
}

MessagePort::~MessagePort() {
  if (data_) Detach();
}

MessagePort::MessagePort(Environment* env,
                         Local<Context> context,
                         Local<Object> wrap)
  : HandleWrap(env,
               wrap,
               reinterpret_cast<uv_handle_t*>(&async_),
               AsyncWrap::PROVIDER_MESSAGEPORT),
    data_(new MessagePortData(this)) {
  auto onmessage = [](uv_async_t* handle) {
    // Called when data has been put into the queue.
    MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
    channel->OnMessage();
  };

  CHECK_EQ(uv_async_init(env->event_loop(),
                         &async_,
                         onmessage), 0);
  // Reset later to indicate success of the constructor.
  bool succeeded = false;
  auto cleanup = OnScopeLeave([&]() { if (!succeeded) Close(); });

  Local<Value> fn;
  if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
    return;

  if (fn->IsFunction()) {
    Local<Function> init = fn.As<Function>();
    if (init->Call(context, wrap, 0, nullptr).IsEmpty())
      return;
  }

  Local<Function> emit_message_fn;
  if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
    return;
  emit_message_fn_.Reset(env->isolate(), emit_message_fn);

  succeeded = true;
  Debug(this, "Created message port");
}

bool MessagePort::IsDetached() const {
  return data_ == nullptr || IsHandleClosing();
}

void MessagePort::TriggerAsync() {
  if (IsHandleClosing()) return;
  CHECK_EQ(uv_async_send(&async_), 0);
}

void MessagePort::Close(v8::Local<v8::Value> close_callback) {
  Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));

  if (data_) {
    // Wrap this call with accessing the mutex, so that TriggerAsync()
    // can check IsHandleClosing() without race conditions.
    Mutex::ScopedLock sibling_lock(data_->mutex_);
    HandleWrap::Close(close_callback);
  } else {
    HandleWrap::Close(close_callback);
  }
}

void MessagePort::New(const FunctionCallbackInfo<Value>& args) {
  // This constructor just throws an error. Unfortunately, we can’t use V8’s
  // ConstructorBehavior::kThrow, as that also removes the prototype from the
  // class (i.e. makes it behave like an arrow function).
  Environment* env = Environment::GetCurrent(args);
  THROW_ERR_CONSTRUCT_CALL_INVALID(env);
}

MessagePort* MessagePort::New(
    Environment* env,
    Local<Context> context,
    std::unique_ptr<MessagePortData> data) {
  Context::Scope context_scope(context);
  Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);

  // Construct a new instance, then assign the listener instance and possibly
  // the MessagePortData to it.
  Local<Object> instance;
  if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
    return nullptr;
  MessagePort* port = new MessagePort(env, context, instance);
  CHECK_NOT_NULL(port);
  if (port->IsHandleClosing()) {
    // Construction failed with an exception.
    return nullptr;
  }

  if (data) {
    port->Detach();
    port->data_ = std::move(data);

    // This lock is here to avoid race conditions with the `owner_` read
    // in AddToIncomingQueue(). (This would likely be unproblematic without it,
    // but it's better to be safe than sorry.)
    Mutex::ScopedLock lock(port->data_->mutex_);
    port->data_->owner_ = port;
    // If the existing MessagePortData object had pending messages, this is
    // the easiest way to run that queue.
    port->TriggerAsync();
  }
  return port;
}

MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
                                              bool only_if_receiving) {
  Message received;
  {
    // Get the head of the message queue.
    Mutex::ScopedLock lock(data_->mutex_);

    Debug(this, "MessagePort has message");

    bool wants_message = receiving_messages_ || !only_if_receiving;
    // We have nothing to do if:
    // - There are no pending messages
    // - We are not intending to receive messages, and the message we would
    //   receive is not the final "close" message.
    if (data_->incoming_messages_.empty() ||
        (!wants_message &&
         !data_->incoming_messages_.front().IsCloseMessage())) {
      return env()->no_message_symbol();
    }

    received = std::move(data_->incoming_messages_.front());
    data_->incoming_messages_.pop_front();
  }

  if (received.IsCloseMessage()) {
    Close();
    return env()->no_message_symbol();
  }

  if (!env()->can_call_into_js()) return MaybeLocal<Value>();

  return received.Deserialize(env(), context);
}

void MessagePort::OnMessage() {
  Debug(this, "Running MessagePort::OnMessage()");
  HandleScope handle_scope(env()->isolate());
  Local<Context> context = object(env()->isolate())->CreationContext();

  size_t processing_limit;
  {
    Mutex::ScopedLock(data_->mutex_);
    processing_limit = std::max(data_->incoming_messages_.size(),
                                static_cast<size_t>(1000));
  }

  // data_ can only ever be modified by the owner thread, so no need to lock.
  // However, the message port may be transferred while it is processing
  // messages, so we need to check that this handle still owns its `data_` field
  // on every iteration.
  while (data_) {
    if (processing_limit-- == 0) {
      // Prevent event loop starvation by only processing those messages without
      // interruption that were already present when the OnMessage() call was
      // first triggered, but at least 1000 messages because otherwise the
      // overhead of repeatedly triggering the uv_async_t instance becomes
      // noticable, at least on Windows.
      // (That might require more investigation by somebody more familiar with
      // Windows.)
      TriggerAsync();
      return;
    }

    HandleScope handle_scope(env()->isolate());
    Context::Scope context_scope(context);

    Local<Value> payload;
    if (!ReceiveMessage(context, true).ToLocal(&payload)) break;
    if (payload == env()->no_message_symbol()) break;

    if (!env()->can_call_into_js()) {
      Debug(this, "MessagePort drains queue because !can_call_into_js()");
      // In this case there is nothing to do but to drain the current queue.
      continue;
    }

    Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
    if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
      // Re-schedule OnMessage() execution in case of failure.
      if (data_)
        TriggerAsync();
      return;
    }
  }
}

void MessagePort::OnClose() {
  Debug(this, "MessagePort::OnClose()");
  if (data_) {
    // Detach() returns move(data_).
    Detach()->Disentangle();
  }
}

std::unique_ptr<MessagePortData> MessagePort::Detach() {
  CHECK(data_);
  Mutex::ScopedLock lock(data_->mutex_);
  data_->owner_ = nullptr;
  return std::move(data_);
}


Maybe<bool> MessagePort::PostMessage(Environment* env,
                                     Local<Value> message_v,
                                     const TransferList& transfer_v) {
  Isolate* isolate = env->isolate();
  Local<Object> obj = object(isolate);
  Local<Context> context = obj->CreationContext();

  Message msg;

  // Per spec, we need to both check if transfer list has the source port, and
  // serialize the input message, even if the MessagePort is closed or detached.

  Maybe<bool> serialization_maybe =
      msg.Serialize(env, context, message_v, transfer_v, obj);
  if (data_ == nullptr) {
    return serialization_maybe;
  }
  if (serialization_maybe.IsNothing()) {
    return Nothing<bool>();
  }

  Mutex::ScopedLock lock(*data_->sibling_mutex_);
  bool doomed = false;

  // Check if the target port is posted to itself.
  if (data_->sibling_ != nullptr) {
    for (const auto& port_data : msg.message_ports()) {
      if (data_->sibling_ == port_data.get()) {
        doomed = true;
        ProcessEmitWarning(env, "The target port was posted to itself, and "
                                "the communication channel was lost");
        break;
      }
    }
  }

  if (data_->sibling_ == nullptr || doomed)
    return Just(true);

  data_->sibling_->AddToIncomingQueue(std::move(msg));
  return Just(true);
}

static Maybe<bool> ReadIterable(Environment* env,
                                Local<Context> context,
                                // NOLINTNEXTLINE(runtime/references)
                                TransferList& transfer_list,
                                Local<Value> object) {
  if (!object->IsObject()) return Just(false);

  if (object->IsArray()) {
    Local<Array> arr = object.As<Array>();
    size_t length = arr->Length();
    transfer_list.AllocateSufficientStorage(length);
    for (size_t i = 0; i < length; i++) {
      if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
        return Nothing<bool>();
    }
    return Just(true);
  }

  Isolate* isolate = env->isolate();
  Local<Value> iterator_method;
  if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
      .ToLocal(&iterator_method)) return Nothing<bool>();
  if (!iterator_method->IsFunction()) return Just(false);

  Local<Value> iterator;
  if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
      .ToLocal(&iterator)) return Nothing<bool>();
  if (!iterator->IsObject()) return Just(false);

  Local<Value> next;
  if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
    return Nothing<bool>();
  if (!next->IsFunction()) return Just(false);

  std::vector<Local<Value>> entries;
  while (env->can_call_into_js()) {
    Local<Value> result;
    if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
        .ToLocal(&result)) return Nothing<bool>();
    if (!result->IsObject()) return Just(false);

    Local<Value> done;
    if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
      return Nothing<bool>();
    if (done->BooleanValue(isolate)) break;

    Local<Value> val;
    if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
      return Nothing<bool>();
    entries.push_back(val);
  }

  transfer_list.AllocateSufficientStorage(entries.size());
  std::copy(entries.begin(), entries.end(), &transfer_list[0]);
  return Just(true);
}

void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
  Environment* env = Environment::GetCurrent(args);
  Local<Object> obj = args.This();
  Local<Context> context = obj->CreationContext();

  if (args.Length() == 0) {
    return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
                                       "MessagePort.postMessage");
  }

  if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
    // Browsers ignore null or undefined, and otherwise accept an array or an
    // options object.
    return THROW_ERR_INVALID_ARG_TYPE(env,
        "Optional transferList argument must be an iterable");
  }

  TransferList transfer_list;
  if (args[1]->IsObject()) {
    bool was_iterable;
    if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
      return;
    if (!was_iterable) {
      Local<Value> transfer_option;
      if (!args[1].As<Object>()->Get(context, env->transfer_string())
          .ToLocal(&transfer_option)) return;
      if (!transfer_option->IsUndefined()) {
        if (!ReadIterable(env, context, transfer_list, transfer_option)
            .To(&was_iterable)) return;
        if (!was_iterable) {
          return THROW_ERR_INVALID_ARG_TYPE(env,
              "Optional options.transfer argument must be an iterable");
        }
      }
    }
  }

  MessagePort* port = Unwrap<MessagePort>(args.This());
  // Even if the backing MessagePort object has already been deleted, we still
  // want to serialize the message to ensure spec-compliant behavior w.r.t.
  // transfers.
  if (port == nullptr) {
    Message msg;
    USE(msg.Serialize(env, context, args[0], transfer_list, obj));
    return;
  }

  port->PostMessage(env, args[0], transfer_list);
}

void MessagePort::Start() {
  Debug(this, "Start receiving messages");
  receiving_messages_ = true;
  Mutex::ScopedLock lock(data_->mutex_);
  if (!data_->incoming_messages_.empty())
    TriggerAsync();
}

void MessagePort::Stop() {
  Debug(this, "Stop receiving messages");
  receiving_messages_ = false;
}

void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
  MessagePort* port;
  ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
  if (!port->data_) {
    return;
  }
  port->Start();
}

void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
  MessagePort* port;
  CHECK(args[0]->IsObject());
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
  if (!port->data_) {
    return;
  }
  port->Stop();
}

void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
  MessagePort* port;
  ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
  port->OnMessage();
}

void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
  Environment* env = Environment::GetCurrent(args);
  if (!args[0]->IsObject() ||
      !env->message_port_constructor_template()->HasInstance(args[0])) {
    return THROW_ERR_INVALID_ARG_TYPE(env,
        "First argument needs to be a MessagePort instance");
  }
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
  if (port == nullptr) {
    // Return 'no messages' for a closed port.
    args.GetReturnValue().Set(
        Environment::GetCurrent(args)->no_message_symbol());
    return;
  }

  MaybeLocal<Value> payload =
      port->ReceiveMessage(port->object()->CreationContext(), false);
  if (!payload.IsEmpty())
    args.GetReturnValue().Set(payload.ToLocalChecked());
}

void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
  Environment* env = Environment::GetCurrent(args);
  if (!args[0]->IsObject() ||
      !env->message_port_constructor_template()->HasInstance(args[0])) {
    return THROW_ERR_INVALID_ARG_TYPE(env,
        "First argument needs to be a MessagePort instance");
  }
  MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
  CHECK_NOT_NULL(port);

  Local<Value> context_arg = args[1];
  ContextifyContext* context_wrapper;
  if (!context_arg->IsObject() ||
      (context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
          env, context_arg.As<Object>())) == nullptr) {
    return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
  }

  std::unique_ptr<MessagePortData> data;
  if (!port->IsDetached())
    data = port->Detach();

  Context::Scope context_scope(context_wrapper->context());
  MessagePort* target =
      MessagePort::New(env, context_wrapper->context(), std::move(data));
  if (target != nullptr)
    args.GetReturnValue().Set(target->object());
}

void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
  Entangle(a, b->data_.get());
}

void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
  MessagePortData::Entangle(a->data_.get(), b);
}

void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
  tracker->TrackField("data", data_);
  tracker->TrackField("emit_message_fn", emit_message_fn_);
}

Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
  // Factor generating the MessagePort JS constructor into its own piece
  // of code, because it is needed early on in the child environment setup.
  Local<FunctionTemplate> templ = env->message_port_constructor_template();
  if (!templ.IsEmpty())
    return templ;

  {
    Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
    m->SetClassName(env->message_port_constructor_string());
    m->InstanceTemplate()->SetInternalFieldCount(
        MessagePort::kInternalFieldCount);
    m->Inherit(HandleWrap::GetConstructorTemplate(env));

    env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
    env->SetProtoMethod(m, "start", MessagePort::Start);

    env->set_message_port_constructor_template(m);
  }

  return GetMessagePortConstructorTemplate(env);
}

namespace {

static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
  Environment* env = Environment::GetCurrent(args);
  if (!args.IsConstructCall()) {
    THROW_ERR_CONSTRUCT_CALL_REQUIRED(env);
    return;
  }

  Local<Context> context = args.This()->CreationContext();
  Context::Scope context_scope(context);

  MessagePort* port1 = MessagePort::New(env, context);
  if (port1 == nullptr) return;
  MessagePort* port2 = MessagePort::New(env, context);
  if (port2 == nullptr) {
    port1->Close();
    return;
  }

  MessagePort::Entangle(port1, port2);

  args.This()->Set(context, env->port1_string(), port1->object())
      .Check();
  args.This()->Set(context, env->port2_string(), port2->object())
      .Check();
}

static void InitMessaging(Local<Object> target,
                          Local<Value> unused,
                          Local<Context> context,
                          void* priv) {
  Environment* env = Environment::GetCurrent(context);

  {
    Local<String> message_channel_string =
        FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
    Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
    templ->SetClassName(message_channel_string);
    target->Set(context,
                message_channel_string,
                templ->GetFunction(context).ToLocalChecked()).Check();
  }

  target->Set(context,
              env->message_port_constructor_string(),
              GetMessagePortConstructorTemplate(env)
                  ->GetFunction(context).ToLocalChecked()).Check();

  // These are not methods on the MessagePort prototype, because
  // the browser equivalents do not provide them.
  env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
  env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
  env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
  env->SetMethod(target, "moveMessagePortToContext",
                 MessagePort::MoveToContext);

  {
    Local<Function> domexception = GetDOMException(context).ToLocalChecked();
    target
        ->Set(context,
              FIXED_ONE_BYTE_STRING(env->isolate(), "DOMException"),
              domexception)
        .Check();
  }
}

}  // anonymous namespace

}  // namespace worker
}  // namespace node

NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging)