trema/trema-edge

View on GitHub
src/lib/messenger.c

Summary

Maintainability
Test Coverage
/*
 * Author: Toshio Koide
 *
 * Copyright (C) 2008-2013 NEC Corporation
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License, version 2, as
 * published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */


#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <linux/limits.h>
#include <linux/sockios.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include "doubly_linked_list.h"
#include "event_handler.h"
#include "hash_table.h"
#include "log.h"
#include "messenger.h"
#include "timer.h"
#include "wrapper.h"


#ifdef UNIT_TESTING

#define static

// Redirect socket functions to mock functions in the unit test.
#ifdef socket
#undef socket
#endif
#define socket mock_socket
extern int mock_socket( int domain, int type, int protocol );

#ifdef bind
#undef bind
#endif
#define bind mock_bind
extern int mock_bind( int sockfd, const struct sockaddr *addr, socklen_t addrlen );

#ifdef listen
#undef listen
#endif
#define listen mock_listen
extern int mock_listen( int sockfd, int backlog );

#ifdef close
#undef close
#endif
#define close mock_close
extern int mock_close( int fd );

#ifdef connect
#undef connect
#endif
#define connect mock_connect
extern int mock_connect( int sockfd, const struct sockaddr *addr, socklen_t addrlen );

#ifdef select
#undef select
#endif
#define select mock_select
extern int mock_select( int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout );

#ifdef accept
#undef accept
#endif
#define accept mock_accept
extern int mock_accept( int sockfd, struct sockaddr *addr, socklen_t *addrlen );

#ifdef recv
#undef recv
#endif
#define recv mock_recv
extern ssize_t mock_recv( int sockfd, void *buf, size_t len, int flags );

#ifdef send
#undef send
#endif
#define send mock_send
extern ssize_t mock_send( int sockfd, const void *buf, size_t len, int flags );

#ifdef setsockopt
#undef setsockopt
#endif
#define setsockopt mock_setsockopt
extern int mock_setsockopt( int s, int level, int optname, const void *optval, socklen_t optlen );

#ifdef clock_gettime
#undef clock_gettime
#endif
#define clock_gettime mock_clock_gettime
extern int mock_clock_gettime( clockid_t clk_id, struct timespec *tp );

#ifdef error
#undef error
#endif
#define error mock_error
extern void mock_error( const char *format, ... );

#ifdef debug
#undef debug
#endif
#define debug mock_debug
extern void mock_debug( const char *format, ... );

#ifdef warn
#undef warn
#endif
#define warn mock_warn
extern void mock_warn( const char *format, ... );

#ifdef add_periodic_event_callback
#undef add_periodic_event_callback
#endif
#define add_periodic_event_callback mock_add_periodic_event_callback
extern bool mock_add_periodic_event_callback( const time_t seconds, void ( *callback )( void *user_data ), void *user_data );

#endif // UNIT_TESTING


enum {
  MESSAGE_TYPE_NOTIFY,
  MESSAGE_TYPE_REQUEST,
  MESSAGE_TYPE_REPLY,
};

typedef struct message_buffer {
  void *buffer;
  size_t data_length;
  size_t size;
  size_t head_offset;
} message_buffer;

typedef struct messenger_socket {
  int fd;
} messenger_socket;

typedef struct messenger_context {
  uint32_t transaction_id;
  int life_count;
  void *user_data;
} messenger_context;

typedef struct receive_queue_callback {
  void  *function;
  uint8_t message_type;
} receive_queue_callback;

typedef struct receive_queue {
  char service_name[ MESSENGER_SERVICE_NAME_LENGTH ];
  dlist_element *message_callbacks;
  int listen_socket;
  struct sockaddr_un listen_addr;
  dlist_element *client_sockets;
  message_buffer *buffer;
} receive_queue;

typedef struct send_queue {
  char service_name[ MESSENGER_SERVICE_NAME_LENGTH ];
  int server_socket;
  int refused_count;
  struct timespec reconnect_interval;
  struct sockaddr_un server_addr;
  message_buffer *buffer;
  bool running_timer;
  uint32_t overflow;
  uint64_t overflow_total_length;
  int socket_buffer_size;
} send_queue;


#define MESSENGER_RECV_BUFFER 100000
static const uint32_t messenger_send_queue_length = MESSENGER_RECV_BUFFER * 4;
static const uint32_t messenger_send_length_for_flush = MESSENGER_RECV_BUFFER;
static const uint32_t messenger_bucket_size = MESSENGER_RECV_BUFFER;
static const uint32_t messenger_recv_queue_length = MESSENGER_RECV_BUFFER * 2;
static const uint32_t messenger_recv_queue_reserved = MESSENGER_RECV_BUFFER;

char socket_directory[ PATH_MAX ];
static bool initialized = false;
static bool finalized = false;
static hash_table *receive_queues = NULL;
static hash_table *send_queues = NULL;
static hash_table *context_db = NULL;
static char *_dump_service_name = NULL;
static char *_dump_app_name = NULL;
static uint32_t last_transaction_id = 0;

static void on_accept( int fd, void *data );
static void on_recv( int fd, void *data );
static void on_send_write( int fd, void *data );
static void on_send_read( int fd, void *data );

static void
_delete_context( void *key, void *value, void *user_data ) {
  assert( value != NULL );
  UNUSED( key );
  UNUSED( user_data );
  messenger_context *context = value;

  debug( "Deleting a context ( transaction_id = %#x, life_count = %d, user_data = %p ).",
         context->transaction_id, context->life_count, context->user_data );

  delete_hash_entry( context_db, &context->transaction_id );
  xfree( context );
}


static void
delete_context( messenger_context *context ) {
  assert( context != NULL );

  _delete_context( &context->transaction_id, context, NULL );
}


static void
_age_context( void *key, void *value, void *user_data ) {
  assert( value != NULL );
  UNUSED( key );
  UNUSED( user_data );
  messenger_context *context = value;
  context->life_count--;
  if ( context->life_count <= 0 ) {
    delete_context( context );
  }
}


static void
age_context_db( void *user_data ) {
  UNUSED( user_data );

  debug( "Aging context database ( context_db = %p ).", context_db );

  foreach_hash( context_db, _age_context, NULL );
}


bool
init_messenger( const char *working_directory ) {
  assert( working_directory != NULL );

  init_event_handler();

  if ( initialized ) {
    warn( "Messenger is already initialized." );
    return true;
  }

  strcpy( socket_directory, working_directory );

  receive_queues = create_hash_with_size( compare_string, hash_string, 8 );
  send_queues = create_hash_with_size( compare_string, hash_string, 8 );
  context_db = create_hash_with_size( compare_uint32, hash_uint32, 128 );

  initialized = true;
  finalized = false;

  return initialized;
}


static void
delete_context_db( void ) {
  debug( "Deleting context database ( context_db = %p ).", context_db );

  if ( context_db != NULL ) {
    foreach_hash( context_db, _delete_context, NULL );
    delete_hash( context_db );
    context_db = NULL;
  }
}


static void
free_message_buffer( message_buffer *buf ) {
  assert( buf != NULL );

  xfree( buf->buffer );
  xfree( buf );
}


static void*
get_message_buffer_head( message_buffer *buf ) {
  return ( char * ) buf->buffer + buf->head_offset;
}


static void
delete_send_queue( send_queue *sq ) {
  assert( NULL != sq );

  debug( "Deleting a send queue ( service_name = %s, fd = %d ).", sq->service_name, sq->server_socket );

  free_message_buffer( sq->buffer );
  if ( sq->server_socket != -1 ) {
    set_readable( sq->server_socket, false );
    set_writable( sq->server_socket, false );
    delete_fd_handler( sq->server_socket );

    close( sq->server_socket );
  }
  if ( send_queues != NULL ) {
    delete_hash_entry( send_queues, sq->service_name );
  }
  else {
    error( "All send queues are already deleted or not created yet." );
  }
  xfree( sq );
}


static void
delete_all_send_queues() {
  hash_iterator iter;
  hash_entry *e;

  debug( "Deleting all send queues ( send_queues = %p ).", send_queues );

  if ( send_queues != NULL ) {
    init_hash_iterator( send_queues, &iter );
    while ( ( e = iterate_hash_next( &iter ) ) != NULL ) {
      delete_send_queue( e->value );
    }
    delete_hash( send_queues );
    send_queues = NULL;
  }
  else {
    error( "All send queues are already deleted or not created yet." );
  }
}


static void
send_dump_message( uint16_t dump_type, const char *service_name, const void *data, uint32_t data_len ) {
  assert( service_name != NULL );

  debug( "Sending a dump message ( dump_type = %#x, service_name = %s, data = %p, data_len = %u ).",
         dump_type, service_name, data, data_len );

  size_t service_name_len, app_name_len;
  char *dump_buf, *p;
  message_dump_header *dump_hdr;
  size_t dump_buf_len;

  if ( _dump_service_name == NULL ) {
    debug( "Dump service name is not set." );
    return;
  }
  if ( strcmp( service_name, _dump_service_name ) == 0 ) {
    debug( "Source service name and destination service name are the same ( service name = %s ).", service_name );
    return;
  }

  struct timespec now;
  if ( clock_gettime( CLOCK_REALTIME, &now ) == -1 ) {
    error( "Failed to retrieve system-wide real-time clock ( %s [%d] ).", strerror( errno ), errno );
    return;
  }

  service_name_len = strlen( service_name ) + 1;
  app_name_len = strlen( _dump_app_name ) + 1;
  dump_buf_len = sizeof( message_dump_header ) + app_name_len + service_name_len + data_len;
  dump_buf = xmalloc( dump_buf_len );
  dump_hdr = ( message_dump_header * ) dump_buf;

  // header
  dump_hdr->sent_time.sec = htonl( ( uint32_t ) now.tv_sec );
  dump_hdr->sent_time.nsec = htonl( ( uint32_t ) now.tv_nsec );
  dump_hdr->app_name_length = htons( ( uint16_t ) app_name_len );
  dump_hdr->service_name_length = htons( ( uint16_t ) service_name_len );
  dump_hdr->data_length = htonl( data_len );

  // app name
  p = dump_buf;
  p += sizeof( message_dump_header );
  memcpy( p, _dump_app_name, app_name_len );

  // service name
  p += app_name_len;
  memcpy( p, service_name, service_name_len );

  // data
  p += service_name_len;
  memcpy( p, data, data_len );

  // send
  send_message( _dump_service_name, dump_type, dump_buf, dump_buf_len );

  xfree( dump_buf );
}


/**
 * closes accepted sockets and listening socket, and releases memories.
 */
static void
delete_receive_queue( void *service_name, void *_rq, void *user_data ) {
  debug( "Deleting a receive queue ( service_name = %s, _rq = %p, user_data = %p ).", service_name, _rq, user_data );

  receive_queue *rq = _rq;
  messenger_socket *client_socket;
  dlist_element *element;
  receive_queue_callback *cb;

  assert( rq != NULL );
  for ( element = rq->message_callbacks->next; element; element = element->next ) {
    cb = element->data;
    debug( "Deleting a callback ( function = %p, message_type = %#x ).", cb->function, cb->message_type );
    xfree( cb );
  }
  delete_dlist( rq->message_callbacks );

  for ( element = rq->client_sockets->next; element; element = element->next ) {
    client_socket = element->data;

    debug( "Closing a client socket ( fd = %d ).", client_socket->fd );

    set_readable( client_socket->fd, false );
    delete_fd_handler( client_socket->fd );

    close( client_socket->fd );
    xfree( client_socket );
    send_dump_message( MESSENGER_DUMP_RECV_CLOSED, rq->service_name, NULL, 0 );
  }
  delete_dlist( rq->client_sockets );

  set_readable( rq->listen_socket, false );
  delete_fd_handler( rq->listen_socket );

  close( rq->listen_socket );
  free_message_buffer( rq->buffer );
  unlink( rq->listen_addr.sun_path );

  if ( receive_queues != NULL ) {
    delete_hash_entry( receive_queues, rq->service_name );
  }
  else {
    error( "All receive queues are already deleted or not created yet." );
  }
  xfree( rq );
}


static void
delete_all_receive_queues() {
  debug( "Deleting all receive queues ( receive_queues = %p ).", receive_queues );

  if ( receive_queues != NULL ) {
    foreach_hash( receive_queues, delete_receive_queue, NULL );
    delete_hash( receive_queues );
    receive_queues = NULL;
  }
  else {
    error( "All receive queues are already deleted or not created yet." );
  }
}


bool
finalize_messenger() {
  debug( "Finalizing messenger." );

  if ( !initialized ) {
    warn( "Messenger is not initialized yet." );
    return false;
  }
  if ( finalized ) {
    warn( "Messenger is already finalized." );
    return true;
  }

  if ( messenger_dump_enabled() ) {
    stop_messenger_dump();
  }
  if ( receive_queues != NULL ) {
    delete_all_receive_queues();
  }
  if ( send_queues != NULL ) {
    delete_all_send_queues();
  }
  if ( context_db != NULL ) {
    delete_context_db();
  }

  initialized = false;
  finalized = true;

  finalize_event_handler();

  return true;
}


static message_buffer *
create_message_buffer( size_t size ) {
  message_buffer *buf = xmalloc( sizeof( message_buffer ) );

  buf->buffer = xmalloc( size );
  buf->size = size;
  buf->data_length = 0;
  buf->head_offset = 0;

  return buf;
}


static receive_queue *
create_receive_queue( const char *service_name ) {
  assert( service_name != NULL );
  assert( strlen( service_name ) < MESSENGER_SERVICE_NAME_LENGTH );

  debug( "Creating a receive queue (service_name = %s).", service_name );

  assert( receive_queues != NULL );
  receive_queue *rq = lookup_hash_entry( receive_queues, service_name );
  if ( rq != NULL ) {
    warn( "Receive queue for %s is already created.", service_name );
    return rq;
  }

  rq = xmalloc( sizeof( receive_queue ) );
  memset( rq->service_name, 0, MESSENGER_SERVICE_NAME_LENGTH );
  strncpy( rq->service_name, service_name, MESSENGER_SERVICE_NAME_LENGTH );

  memset( &rq->listen_addr, 0, sizeof( struct sockaddr_un ) );
  rq->listen_addr.sun_family = AF_UNIX;
  sprintf( rq->listen_addr.sun_path, "%s/trema.%s.sock", socket_directory, service_name );
  debug( "Set sun_path to %s.", rq->listen_addr.sun_path );

  rq->listen_socket = socket( AF_UNIX, SOCK_SEQPACKET, 0 );
  if ( rq->listen_socket == -1 ) {
    error( "Failed to call socket (errno = %s [%d]).", strerror( errno ), errno );
    xfree( rq );
    return NULL;
  }

  unlink( rq->listen_addr.sun_path ); // FIXME: handle error correctly

  int ret;
  ret = bind( rq->listen_socket, ( struct sockaddr * ) &rq->listen_addr, sizeof( struct sockaddr_un ) );
  if ( ret == -1 ) {
    error( "Failed to bind (fd = %d, sun_path = %s, errno = %s [%d]).",
           rq->listen_socket, rq->listen_addr.sun_path, strerror( errno ), errno );
    close( rq->listen_socket );
    xfree( rq );
    return NULL;
  }

  ret = listen( rq->listen_socket, SOMAXCONN );
  if ( ret == -1 ) {
    error( "Failed to listen (fd = %d, sun_path = %s, errno = %s [%d]).",
           rq->listen_socket, rq->listen_addr.sun_path, strerror( errno ), errno );
    close( rq->listen_socket );
    xfree( rq );
    return NULL;
  }

  ret = fcntl( rq->listen_socket, F_SETFL, O_NONBLOCK );
  if ( ret < 0 ) {
    error( "Failed to set O_NONBLOCK ( %s [%d] ).", strerror( errno ), errno );
    close( rq->listen_socket );
    xfree( rq );
    return NULL;
  }

  set_fd_handler( rq->listen_socket, on_accept, rq, NULL, NULL );
  set_readable( rq->listen_socket, true );

  rq->message_callbacks = create_dlist();
  rq->client_sockets = create_dlist();
  rq->buffer = create_message_buffer( messenger_recv_queue_length );

  insert_hash_entry( receive_queues, rq->service_name, rq );

  return rq;
}


static bool
add_message_callback( const char *service_name, uint8_t message_type, void *callback ) {
  assert( receive_queues != NULL );
  assert( service_name != NULL );
  assert( callback != NULL );

  debug( "Adding a message callback (service_name = %s, message_type = %#x, callback = %p).",
         service_name, message_type, callback );

  receive_queue *rq = lookup_hash_entry( receive_queues, service_name );
  if ( rq == NULL ) {
    debug( "No receive queue found. Creating." );
    rq = create_receive_queue( service_name );
    if ( rq == NULL ) {
      error( "Failed to create a receive queue." );
      return false;
    }
  }

  receive_queue_callback *cb = xmalloc( sizeof( receive_queue_callback ) );
  cb->message_type = message_type;
  cb->function = callback;
  insert_after_dlist( rq->message_callbacks, cb );

  return true;
}


static bool
_add_message_received_callback( const char *service_name, const callback_message_received callback ) {
  assert( service_name != NULL );
  assert( callback != NULL );

  debug( "Adding a message received callback (service_name = %s, callback = %p).",
         service_name, callback );

  return add_message_callback( service_name, MESSAGE_TYPE_NOTIFY, callback );
}
bool ( *add_message_received_callback )( const char *service_name, const callback_message_received function ) = _add_message_received_callback;


static bool
_add_message_requested_callback( const char *service_name,
                                 void ( *callback )( const messenger_context_handle *handle, uint16_t tag, void *data, size_t len ) ) {
  assert( service_name != NULL );
  assert( callback != NULL );

  debug( "Adding a message requested callback ( service_name = %s, callback = %p ).",
         service_name, callback );

  return add_message_callback( service_name, MESSAGE_TYPE_REQUEST, callback );
}
bool ( *add_message_requested_callback )( const char *service_name, void ( *callback )( const messenger_context_handle *handle, uint16_t tag, void *data, size_t len ) ) = _add_message_requested_callback;


static bool
_add_message_replied_callback( const char *service_name, void ( *callback )( uint16_t tag, void *data, size_t len, void *user_data ) ) {
  assert( service_name != NULL );
  assert( callback != NULL );

  debug( "Adding a message replied callback ( service_name = %s, callback = %p ).",
         service_name, callback );

  return add_message_callback( service_name, MESSAGE_TYPE_REPLY, callback );
}
bool ( *add_message_replied_callback )( const char *service_name, void ( *callback )( uint16_t tag, void *data, size_t len, void *user_data ) ) = _add_message_replied_callback;


static bool
delete_message_callback( const char *service_name, uint8_t message_type, void ( *callback ) ) {
  assert( service_name != NULL );
  assert( callback != NULL );

  debug( "Deleting a message callback ( service_name = %s, message_type = %#x, callback = %p ).",
         service_name, message_type, callback );

  if ( receive_queues == NULL ) {
    debug( "All receive queues are already deleted or not created yet." );
    return false;
  }

  receive_queue *rq = lookup_hash_entry( receive_queues, service_name );
  receive_queue_callback *cb;

  if ( NULL != rq ) {
    dlist_element *e;
    for ( e = rq->message_callbacks->next; e; e = e->next ) {
      cb = e->data;
      if ( ( cb->function == callback ) && ( cb->message_type == message_type ) ) {
        debug( "Deleting a callback ( message_type = %#x, callback = %p ).", message_type, callback );
        xfree( cb );
        delete_dlist_element( e );
        if ( rq->message_callbacks->next == NULL ) {
          debug( "No more callback for message_type = %#x.", message_type );
          delete_receive_queue( rq->service_name, rq, NULL );
        }
        return true;
      }
    }
  }

  error( "No registered message callback found." );

  return false;
}


static bool
_delete_message_received_callback( const char *service_name, void ( *callback )( uint16_t tag, void *data, size_t len ) ) {
  assert( service_name != NULL );
  assert( callback != NULL );

  debug( "Deleting a message received callback ( service_name = %s, callback = %p ).",
         service_name, callback );

  return delete_message_callback( service_name, MESSAGE_TYPE_NOTIFY, callback );
}
bool ( *delete_message_received_callback )( const char *service_name, void ( *callback )( uint16_t tag, void *data, size_t len ) ) = _delete_message_received_callback;


static bool
_delete_message_requested_callback( const char *service_name,
  void ( *callback )( const messenger_context_handle *handle, uint16_t tag, void *data, size_t len ) ) {
  assert( service_name != NULL );
  assert( callback != NULL );

  debug( "Deleting a message requested callback ( service_name = %s, callback = %p ).",
         service_name, callback );

  return delete_message_callback( service_name, MESSAGE_TYPE_REQUEST, callback );
}
bool ( *delete_message_requested_callback )( const char *service_name, void ( *callback )( const messenger_context_handle *handle, uint16_t tag, void *data, size_t len ) ) = _delete_message_requested_callback;


static bool
_delete_message_replied_callback( const char *service_name, void ( *callback )( uint16_t tag, void *data, size_t len, void *user_data ) ) {
  assert( service_name != NULL );
  assert( callback != NULL );

  debug( "Deleting a message replied callback ( service_name = %s, callback = %p ).",
         service_name, callback );

  return delete_message_callback( service_name, MESSAGE_TYPE_REPLY, callback );
}
bool ( *delete_message_replied_callback )( const char *service_name, void ( *callback )( uint16_t tag, void *data, size_t len, void *user_data ) ) = _delete_message_replied_callback;


static bool
_rename_message_received_callback( const char *old_service_name, const char *new_service_name ) {
  assert( old_service_name != NULL );
  assert( new_service_name != NULL );
  assert( receive_queues != NULL );

  debug( "Renaming a message received callback ( old_service_name = %s, new_service_name = %s ).",
         old_service_name, new_service_name );

  receive_queue *old_rq = lookup_hash_entry( receive_queues, old_service_name );
  receive_queue *new_rq = lookup_hash_entry( receive_queues, new_service_name );
  dlist_element *element;
  receive_queue_callback *cb;

  if ( old_rq == NULL ) {
    error( "No receive queue for old service name ( %s ) found.", old_service_name );
    return false;
  }
  else if ( new_rq != NULL ) {
    error( "Receive queue for new service name ( %s ) is already created.", new_service_name );
    return false;
  }

  for ( element = old_rq->message_callbacks->next; element; element = element->next ) {
    cb = element->data;
    add_message_callback( new_service_name, cb->message_type, cb->function );
  }

  delete_receive_queue( old_rq->service_name, old_rq, NULL );

  return true;
}
bool ( *rename_message_received_callback )( const char *old_service_name, const char *new_service_name ) = _rename_message_received_callback;


static size_t
message_buffer_remain_bytes( message_buffer *buf ) {
  assert( buf != NULL );

  return buf->size - buf->data_length;
}


/**
 * connects send_queue to the service
 * return value: -1:error, 0:refused (retry), 1:connected
 */
static int
send_queue_connect( send_queue *sq ) {
  assert( sq != NULL );

  sq->running_timer = false;
  if ( ( sq->server_socket = socket( AF_UNIX, SOCK_SEQPACKET, 0 ) ) == -1 ) {
    error( "Failed to call socket ( errno = %s [%d] ).", strerror( errno ), errno );
    return -1;
  }

  if ( geteuid() == 0 ) {
    int wmem_size = 1048576;
    int ret = setsockopt( sq->server_socket, SOL_SOCKET, SO_SNDBUFFORCE, ( const void * ) &wmem_size, ( socklen_t ) sizeof( wmem_size ) );
    if ( ret < 0 ) {
      error( "Failed to set SO_SNDBUFFORCE to %d ( %s [%d] ).", wmem_size, strerror( errno ), errno );
      close( sq->server_socket );
      sq->server_socket = -1;
      return -1;
    }
  }
  int ret = fcntl( sq->server_socket, F_SETFL, O_NONBLOCK );
  if ( ret < 0 ) {
    error( "Failed to set O_NONBLOCK ( %s [%d] ).", strerror( errno ), errno );
    close( sq->server_socket );
    sq->server_socket = -1;
    return -1;
  }

  if ( connect( sq->server_socket, ( struct sockaddr * ) &sq->server_addr, sizeof( struct sockaddr_un ) ) == -1 ) {
    debug( "Connection refused ( service_name = %s, sun_path = %s, fd = %d, errno = %s [%d] ).",
           sq->service_name, sq->server_addr.sun_path, sq->server_socket, strerror( errno ), errno );

    send_dump_message( MESSENGER_DUMP_SEND_REFUSED, sq->service_name, NULL, 0 );
    close( sq->server_socket );
    sq->server_socket = -1;

    return 0;
  }

  set_fd_handler( sq->server_socket, on_send_read, sq, &on_send_write, sq );
  set_readable( sq->server_socket, true );

  if ( sq->buffer != NULL && sq->buffer->data_length >= sizeof( message_header ) ) {
    set_writable( sq->server_socket, true );
  }

  debug( "Connection established ( service_name = %s, sun_path = %s, fd = %d ).",
         sq->service_name, sq->server_addr.sun_path, sq->server_socket );

  socklen_t optlen = sizeof ( sq->socket_buffer_size );
  if ( getsockopt( sq->server_socket, SOL_SOCKET, SO_SNDBUF, &sq->socket_buffer_size, &optlen ) == -1 ) {
    sq->socket_buffer_size = 0;
  }

  send_dump_message( MESSENGER_DUMP_SEND_CONNECTED, sq->service_name, NULL, 0 );

  return 1;
}


static int send_queue_connect_timer( send_queue *sq );

static int
send_queue_connect_timeout( send_queue *sq ) {
  sq->running_timer = false;
  return send_queue_connect_timer( sq );
}

// Remember to clean up timer if we delete the send_queue.

static int
send_queue_connect_timer( send_queue *sq ) {
  struct itimerspec interval;
  if ( sq->server_socket != -1 ) {
    return 1;
  }
  if ( sq->running_timer ) {
    sq->running_timer = false;
    delete_timer_event( ( timer_callback )send_queue_connect_timeout, sq );
  }

  int ret = send_queue_connect( sq );
  int error = -1;

  switch ( ret ) {
  case -1:
    // Print an error, and find a better way of indicating the send
    // queue has an error.
    sq->reconnect_interval.tv_sec = -1;
    sq->reconnect_interval.tv_nsec = 0;
    break;

  case 0:
    // Try again later.
    sq->refused_count++;
    sq->reconnect_interval.tv_sec = ( 1 << ( sq->refused_count > 4 ? 4 : sq->refused_count - 1 ) );

    interval.it_interval.tv_sec = 0;
    interval.it_interval.tv_nsec = 0;
    interval.it_value = sq->reconnect_interval;
    add_timer_event_callback( &interval, ( void (*)(void *) )send_queue_connect_timeout, ( void * ) sq );
    sq->running_timer = true;

    debug( "refused_count = %d, reconnect_interval = %u.", sq->refused_count, sq->reconnect_interval.tv_sec );
    error =  0;
    break;

  case 1:
    // Success.
    sq->refused_count = 0;
    sq->reconnect_interval.tv_sec = 0;
    sq->reconnect_interval.tv_nsec = 0;
    error = 0;
    break;

  default:
    die( "Got invalid value from send_queue_connect_timer( send_queue* )." );
    break;
  }

  return error;
}


static int
send_queue_try_connect( send_queue *sq ) {
  // TODO: Add a proper check for this.
  if ( sq->reconnect_interval.tv_sec != 0 ) {
    return 0;
  }

  return send_queue_connect_timer( sq );
}


/**
 * creates send_queue and connects to specified service name.
 */
static send_queue *
create_send_queue( const char *service_name ) {
  assert( service_name != NULL );

  debug( "Creating a send queue ( service_name = %s ).", service_name );

  send_queue *sq;

  assert( send_queues != NULL );

  sq = lookup_hash_entry( send_queues, service_name );
  if ( NULL != sq ) {
    warn( "Send queue for %s is already created.", service_name );
    return sq;
  }

  sq = xmalloc( sizeof( send_queue ) );
  memset( sq->service_name, 0, MESSENGER_SERVICE_NAME_LENGTH );
  strncpy( sq->service_name, service_name, MESSENGER_SERVICE_NAME_LENGTH );

  memset( &sq->server_addr, 0, sizeof( struct sockaddr_un ) );
  sq->server_addr.sun_family = AF_UNIX;
  sprintf( sq->server_addr.sun_path, "%s/trema.%s.sock", socket_directory, service_name );
  debug( "Set sun_path to %s.", sq->server_addr.sun_path );

  sq->server_socket = -1;
  sq->buffer = NULL;
  sq->refused_count = 0;
  sq->reconnect_interval.tv_sec = 0;
  sq->reconnect_interval.tv_nsec = 0;
  sq->running_timer = false;
  sq->overflow = 0;
  sq->overflow_total_length = 0;
  sq->socket_buffer_size = 0;

  if ( send_queue_try_connect( sq ) == -1 ) {
    xfree( sq );
    error( "Failed to create a send queue for %s.", service_name );
    return NULL;
  }

  sq->buffer = create_message_buffer( messenger_send_queue_length );

  insert_hash_entry( send_queues, sq->service_name, sq );

  return sq;
}


static bool
write_message_buffer( message_buffer *buf, const void *data, size_t len ) {
  assert( buf != NULL );

  if ( message_buffer_remain_bytes( buf ) < len ) {
    return false;
  }

  if ( ( buf->head_offset + buf->data_length + len ) <= buf->size ) {
    memcpy( ( char * ) get_message_buffer_head( buf ) + buf->data_length, data, len );
  }
  else {
    memmove( buf->buffer, ( char * ) get_message_buffer_head( buf ), buf->data_length );
    buf->head_offset = 0;
    memcpy( ( char * ) buf->buffer + buf->data_length, data, len );
  }
  buf->data_length += len;

  return true;
}


static bool
push_message_to_send_queue( const char *service_name, const uint8_t message_type, const uint16_t tag, const void *data, size_t len ) {
  assert( service_name != NULL );

  debug( "Pushing a message to send queue ( service_name = %s, message_type = %#x, tag = %#x, data = %p, len = %u ).",
         service_name, message_type, tag, data, len );

  message_header header;

  if ( send_queues == NULL ) {
    error( "All send queues are already deleted or not created yet." );
    return false;
  }

  send_queue *sq = lookup_hash_entry( send_queues, service_name );

  if ( NULL == sq ) {
    sq = create_send_queue( service_name );
    assert( sq != NULL );
  }

  header.version = 0;
  header.message_type = message_type;
  header.tag = htons( tag );
  uint32_t length = ( uint32_t ) ( sizeof( message_header ) + len );
  header.message_length = htonl( length );

  if ( message_buffer_remain_bytes( sq->buffer ) < length ) {
    if ( sq->overflow == 0 ) {
      warn( "Could not write a message to send queue due to overflow ( service_name = %s, fd = %u, length = %u ).", sq->service_name, sq->server_socket, length );
    }
    ++sq->overflow;
    sq->overflow_total_length += length;
    send_dump_message( MESSENGER_DUMP_SEND_OVERFLOW, sq->service_name, NULL, 0 );
    return false;
  }
  if ( sq->overflow > 1 ) {
    warn( "Could not write a message to send queue due to overflow ( service_name = %s, fd = %u, count = %u, total length = %" PRIu64 " ).", sq->service_name, sq->server_socket, sq->overflow, sq->overflow_total_length );
  }
  sq->overflow = 0;
  sq->overflow_total_length = 0;

  write_message_buffer( sq->buffer, &header, sizeof( message_header ) );
  write_message_buffer( sq->buffer, data, len );

  if ( sq->server_socket == -1 ) {
    debug( "Tried to send message on closed send queue, connecting..." );

    send_queue_try_connect( sq );
    return true;
  }

  set_writable( sq->server_socket, true );
  if ( sq->buffer->data_length > messenger_send_length_for_flush ) {
    on_send_write( sq->server_socket, sq );
  }
  return true;
}


static bool
_send_message( const char *service_name, const uint16_t tag, const void *data, size_t len ) {
  assert( service_name != NULL );

  debug( "Sending a message ( service_name = %s, tag = %#x, data = %p, len = %u ).",
         service_name, tag, data, len );

  return push_message_to_send_queue( service_name, MESSAGE_TYPE_NOTIFY, tag, data, len );
}
bool ( *send_message )( const char *service_name, const uint16_t tag, const void *data, size_t len ) = _send_message;


static messenger_context *
insert_context( void *user_data ) {
  messenger_context *context = xmalloc( sizeof( messenger_context ) );

  context->transaction_id = ++last_transaction_id;
  context->life_count = 10;
  context->user_data = user_data;

  debug( "Inserting a new context ( transaction_id = %#x, life_count = %d, user_data = %p ).",
         context->transaction_id, context->life_count, context->user_data );

  messenger_context *old = insert_hash_entry( context_db, &context->transaction_id, context );
  if ( old != NULL ) {
    delete_context( old );
  }

  return context;
}


static bool
_send_request_message( const char *to_service_name, const char *from_service_name, const uint16_t tag, const void *data, size_t len, void *user_data ) {
  assert( to_service_name != NULL );
  assert( from_service_name != NULL );

  debug( "Sending a request message ( to_service_name = %s, from_service_name = %s, tag = %#x, data = %p, len = %u, user_data = %p ).",
         to_service_name, from_service_name, tag, data, len, user_data );

  char *request_data, *p;
  size_t from_service_name_len = strlen( from_service_name ) + 1;
  size_t handle_len = sizeof( messenger_context_handle ) + from_service_name_len;
  messenger_context *context;
  messenger_context_handle *handle;
  bool return_value;

  context = insert_context( user_data );

  request_data = xmalloc( handle_len + len );
  handle = ( messenger_context_handle * ) request_data;
  handle->transaction_id = htonl( context->transaction_id );
  handle->service_name_len = htons( ( uint16_t ) from_service_name_len );
  strcpy( handle->service_name, from_service_name );
  p = request_data + handle_len;
  memcpy( p, data, len );

  return_value = push_message_to_send_queue( to_service_name, MESSAGE_TYPE_REQUEST, tag, request_data, handle_len + len );

  xfree( request_data );

  return return_value;
}
bool ( *send_request_message )( const char *to_service_name, const char *from_service_name, const uint16_t tag, const void *data, size_t len, void *user_data ) = _send_request_message;


static bool
_send_reply_message( const messenger_context_handle *handle, const uint16_t tag, const void *data, size_t len ) {
  assert( handle != NULL );

  debug( "Sending a reply message ( handle = [ transaction_id = %#x, service_name_len = %u, service_name = %s ], "
         "tag = %#x, data = %p, len = %u ).",
         handle->transaction_id, handle->service_name_len, handle->service_name, tag, data, len );

  char *reply_data;
  messenger_context_handle *reply_handle;
  bool return_value;

  reply_data = xmalloc( sizeof( messenger_context_handle ) + len );
  reply_handle = ( messenger_context_handle * ) reply_data;
  reply_handle->transaction_id = htonl( handle->transaction_id );
  reply_handle->service_name_len = htons( 0 );
  memcpy( reply_handle->service_name, data, len );

  return_value = push_message_to_send_queue( handle->service_name, MESSAGE_TYPE_REPLY, tag, reply_data, sizeof( messenger_context_handle ) + len );

  xfree( reply_data );

  return return_value;
}
bool ( *send_reply_message )( const messenger_context_handle *handle, const uint16_t tag, const void *data, size_t len ) = _send_reply_message;


static bool
_clear_send_queue( const char *service_name ) {
  assert( service_name != NULL );

  debug( "Deleting all messages from send queue ( service_name = %s ).", service_name );

  if ( send_queues == NULL ) {
    error( "All send queues are already deleted or not created yet." );
    return false;
  }

  send_queue *sq = lookup_hash_entry( send_queues, service_name );

  if ( NULL == sq ) {
    error( "Send queue is already deleted or not created yet ( service_name = %s ).", service_name );
    return false;
  }
  if ( NULL == sq->buffer ) {
    error( "Message buffer is already deleted or not created yet ( send_queue = %p, service_name = %s ).",
           sq, service_name );
    return false;
  }

  if ( sq->buffer->data_length > 0 ) {
    set_writable( sq->server_socket, false );
  }

  sq->buffer->head_offset = 0;
  sq->buffer->data_length = 0;

  return true;
}
bool ( *clear_send_queue )( const char *service_name ) = _clear_send_queue;


static void
number_of_send_queue( int *connected_count, int *sending_count, int *reconnecting_count, int *closed_count ) {
  assert( connected_count != NULL );
  assert( sending_count != NULL );
  assert( reconnecting_count != NULL );
  assert( closed_count != NULL );

  debug( "Checking queue statuses." );

  hash_iterator iter;
  hash_entry *e;

  *connected_count = 0;
  *sending_count = 0;
  *reconnecting_count = 0;
  *closed_count = 0;

  if ( send_queues == NULL ) {
    error( "All send queues are already deleted or not created yet." );
    return;
  }

  init_hash_iterator( send_queues, &iter );
  while ( ( e = iterate_hash_next( &iter ) ) != NULL ) {
    send_queue *sq = e->value;
    if ( sq->server_socket != -1 ) {
      if ( sq->buffer->data_length == 0 ) {
          ( *connected_count )++;
      }
      else {
        if ( sq->refused_count > 0 ) {
            ( *reconnecting_count )++;
        }
        else {
            ( *sending_count )++;
        }
      }
    }
    else {
        ( *closed_count )++;
    }
  }

  debug( "connected_count = %d, reconnecting_count = %d, sending_count = %d, closed_count = %d.",
         *connected_count, *reconnecting_count, *sending_count, *closed_count );
}


static void
add_recv_queue_client_fd( receive_queue *rq, int fd ) {
  assert( rq != NULL );
  assert( fd >= 0 );

  debug( "Adding a client fd to receive queue ( fd = %d, service_name = %s ).", fd, rq->service_name );

  messenger_socket *socket;

  socket = xmalloc( sizeof( messenger_socket ) );
  socket->fd = fd;
  insert_after_dlist( rq->client_sockets, socket );

  set_fd_handler( fd, on_recv, rq, NULL, NULL );
  set_readable( fd, true );
}


static void
on_accept( int fd, void *data ) {
  receive_queue *rq = ( receive_queue* )data;

  assert( rq != NULL );

  int client_fd;
  struct sockaddr_un addr;

  socklen_t addr_len = sizeof( struct sockaddr_un );

  if ( ( client_fd = accept( fd, ( struct sockaddr * ) &addr, &addr_len ) ) == -1 ) {
    error( "Failed to accept ( fd = %d, errno = %s [%d] ).", fd, strerror( errno ), errno );
    return;
  }

  if ( geteuid() == 0 ) {
    int rmem_size = 1048576;
    int ret = setsockopt( client_fd, SOL_SOCKET, SO_RCVBUFFORCE, ( const void * ) &rmem_size, ( socklen_t ) sizeof( rmem_size ) );
    if ( ret < 0 ) {
      error( "Failed to set SO_RCVBUFFORCE to %d ( %s [%d] ).", rmem_size, strerror( errno ), errno );
      close( client_fd );
      return;
    }
  }
  int ret = fcntl( client_fd, F_SETFL, O_NONBLOCK );
  if ( ret < 0 ) {
    error( "Failed to set O_NONBLOCK ( %s [%d] ).", strerror( errno ), errno );
    close( client_fd );
    return;
  }

  add_recv_queue_client_fd( rq, client_fd );
  send_dump_message( MESSENGER_DUMP_RECV_CONNECTED, rq->service_name, NULL, 0 );
}


static int
del_recv_queue_client_fd( receive_queue *rq, int fd ) {
  assert( rq != NULL );
  assert( fd >= 0 );

  messenger_socket *socket;
  dlist_element *element;

  debug( "Deleting a client fd from receive queue ( fd = %d, service_name = %s ).", fd, rq->service_name );

  for ( element = rq->client_sockets->next; element; element = element->next ) {
    socket = element->data;
    if ( socket->fd == fd ) {
      set_readable( fd, false );
      delete_fd_handler( fd );

      debug( "Deleting fd ( %d ).", fd );
      delete_dlist_element( element );
      xfree( socket );
      return 1;
    }
  }

  return 0;
}


static void
truncate_message_buffer( message_buffer *buf, size_t len ) {
  assert( buf != NULL );

  if ( len == 0 || buf->data_length == 0 ) {
    return;
  }

  if ( len > buf->data_length ) {
    len = buf->data_length;
  }

  if ( ( buf->head_offset + len ) <= buf->size ) {
    buf->head_offset += len;
  }
  else {
    memmove( buf->buffer, ( char * ) buf->buffer + buf->head_offset + len, buf->data_length - len );
    buf->head_offset = 0;
  }
  buf->data_length -= len;
}


/**
 * pulls message data from recv_queue.
 * returns 1 if succeeded, otherwise 0.
 */
static int
pull_from_recv_queue( receive_queue *rq, uint8_t *message_type, uint16_t *tag, void *data, size_t *len, size_t maxlen ) {
  assert( rq != NULL );
  assert( message_type != NULL );
  assert( tag != NULL );
  assert( data != NULL );
  assert( len != NULL );

  debug( "Pulling a message from receive queue ( service_name = %s ).", rq->service_name );

  message_header *header;

  if ( rq->buffer->data_length < sizeof( message_header ) ) {
    debug( "Queue length is smaller than a message header ( queue length = %u ).", rq->buffer->data_length );
    return 0;
  }

  header = ( message_header * ) get_message_buffer_head( rq->buffer );

  uint32_t length = ntohl( header->message_length );
  assert( length != 0 );
  assert( length < messenger_recv_queue_length );
  if ( rq->buffer->data_length < length ) {
    debug( "Queue length is smaller than message length ( queue length = %u, message length = %u ).",
           rq->buffer->data_length, length );
    return 0;
  }

  *message_type = header->message_type;
  *tag = ntohs( header->tag );
  *len = length - sizeof( message_header );
  memcpy( data, header->value, *len > maxlen ? maxlen : *len );
  truncate_message_buffer( rq->buffer, length );

  debug( "A message is retrieved from receive queue ( message_type = %#x, tag = %#x, len = %u, data = %p ).",
         *message_type, *tag, *len, data );

  return 1;
}


static messenger_context *
get_context( uint32_t transaction_id ) {
  debug( "Looking up a context ( transaction_id = %#x ).", transaction_id );

  return lookup_hash_entry( context_db, &transaction_id );
}


static void
call_message_callbacks( receive_queue *rq, const uint8_t message_type, const uint16_t tag, void *data, size_t len ) {
  assert( rq != NULL );

  dlist_element *element;
  receive_queue_callback *cb;

  debug( "Calling message callbacks ( service_name = %s, message_type = %#x, tag = %#x, data = %p, len = %u ).",
         rq->service_name, message_type, tag, data, len );

  for ( element = rq->message_callbacks->next; element; element = element->next ) {
    cb = element->data;
    if ( cb->message_type != message_type ) {
      continue;
    }
    switch ( message_type ) {
    case MESSAGE_TYPE_NOTIFY:
      {
        void ( *received_callback )( uint16_t tag, void *data, size_t len );
        received_callback = cb->function;

        debug( "Calling a callback ( %p ) for MESSAGE_TYPE_NOTIFY (%#x) ( tag = %#x, data = %p, len = %u ).",
               cb->function, message_type, tag, data, len );

        received_callback( tag, data, len );
      }
      break;
    case MESSAGE_TYPE_REQUEST:
      {
        void ( *requested_callback )( const messenger_context_handle *handle, uint16_t tag, void *data, size_t len );
        messenger_context_handle *handle;
        char *requested_data;
        size_t header_len;

        requested_callback = cb->function;
        handle = ( messenger_context_handle * ) data;
        handle->transaction_id = ntohl( handle->transaction_id );
        handle->service_name_len = ntohs( handle->service_name_len );
        header_len = sizeof( messenger_context_handle ) + handle->service_name_len;
        requested_data = ( ( char * ) data ) + header_len;

        debug( "Calling a callback ( %p ) for MESSAGE_TYPE_REQUEST (%#x) ( handle = %p, tag = %#x, requested_data = %p, len = %u ).",
               cb->function, message_type, handle, tag, requested_data, len - header_len );

        requested_callback( handle, tag, ( void * ) requested_data, len - header_len );
      }
      break;
    case MESSAGE_TYPE_REPLY:
      {
        debug( "Calling a callback ( %p ) for MESSAGE_TYPE_REPLY (%#x).", cb->function, message_type );

        void ( *replied_callback )( uint16_t tag, void *data, size_t len, void *user_data );
        messenger_context_handle *reply_handle;
        messenger_context *context;

        replied_callback = cb->function;
        reply_handle = data;
        reply_handle->transaction_id = ntohl( reply_handle->transaction_id );
        reply_handle->service_name_len = ntohs( reply_handle->service_name_len );

        context = get_context( reply_handle->transaction_id );

        if ( NULL != context ) {
          debug( "tag = %#x, data = %p, len = %u, user_data = %p.",
                 tag, reply_handle->service_name, len - sizeof( messenger_context_handle ), context->user_data );
          replied_callback( tag, reply_handle->service_name, len - sizeof( messenger_context_handle ), context->user_data );
          delete_context( context );
        }
        else {
          warn( "No context found." );
        }
      }
      break;
    default:
      error( "Unknown message type ( %#x ).", message_type );
      assert( 0 );
      break;
    }
  }
}


static void
on_recv( int fd, void *data ) {
  receive_queue *rq = ( receive_queue* )data;

  assert( rq != NULL );
  assert( fd >= 0 );

  debug( "Receiving data from remote ( fd = %d, service_name = %s ).", fd, rq->service_name );

  uint8_t buf[ MESSENGER_RECV_BUFFER ];
  ssize_t recv_len;
  size_t buf_len;
  uint8_t message_type;
  uint16_t tag;

  while ( ( buf_len = message_buffer_remain_bytes( rq->buffer ) ) > messenger_recv_queue_reserved ) {
    if ( buf_len > sizeof( buf ) ) {
      buf_len = sizeof( buf );
    }
    recv_len = recv( fd, buf, buf_len, 0 );
    if ( recv_len == -1 ) {
      if ( errno != EAGAIN && errno != EWOULDBLOCK ) {
        error( "Failed to recv ( fd = %d, errno = %s [%d] ).", fd, strerror( errno ), errno );
        send_dump_message( MESSENGER_DUMP_RECV_CLOSED, rq->service_name, NULL, 0 );
        del_recv_queue_client_fd( rq, fd );
        close( fd );
      }
      else {
        debug( "Failed to recv ( fd = %d, errno = %s [%d] ).", fd, strerror( errno ), errno );
      }
      break;
    }
    else if ( recv_len == 0 ) {
      debug( "Connection closed ( fd = %d, service_name = %s ).", fd, rq->service_name );
      send_dump_message( MESSENGER_DUMP_RECV_CLOSED, rq->service_name, NULL, 0 );
      del_recv_queue_client_fd( rq, fd );
      close( fd );
      break;
    }

    if ( !write_message_buffer( rq->buffer, buf, ( size_t ) recv_len ) ) {
      warn( "Could not write a message to receive queue due to overflow ( service_name = %s, len = %u ).", rq->service_name, recv_len );
      send_dump_message( MESSENGER_DUMP_RECV_OVERFLOW, rq->service_name, buf, ( uint32_t ) recv_len );
    }
    else {
      debug( "Pushing a message to receive queue ( service_name = %s, len = %u ).", rq->service_name, recv_len );
      send_dump_message( MESSENGER_DUMP_RECEIVED, rq->service_name, buf, ( uint32_t ) recv_len );
    }
  }

  while ( pull_from_recv_queue( rq, &message_type, &tag, buf, &buf_len, sizeof( buf ) ) == 1 ) {
    call_message_callbacks( rq, message_type, tag, buf, buf_len );
  }
}


static uint32_t
get_send_data( send_queue *sq, size_t offset ) {
  assert( sq != NULL );

  uint32_t bucket_size = messenger_bucket_size;
  if ( sq->socket_buffer_size != 0 ) {
    int used;
    if ( ioctl( sq->server_socket, SIOCOUTQ, &used ) == 0 ) {
      if ( used < sq->socket_buffer_size ) {
        bucket_size = ( uint32_t ) ( sq->socket_buffer_size - used ) << 1;
    if ( bucket_size > messenger_bucket_size ) {
      bucket_size = messenger_bucket_size;
    }
      }
      else {
        bucket_size = 1;
      }
    }
  }

  uint32_t length = 0;
  message_header *header;
  while ( ( sq->buffer->data_length - offset ) >= sizeof( message_header ) ) {
    header = ( message_header * ) ( ( char * ) get_message_buffer_head( sq->buffer ) + offset );
    uint32_t message_length = ntohl( header->message_length );
    assert( message_length != 0 );
    assert( message_length < messenger_recv_queue_length );
    if ( length + message_length > bucket_size ) {
      if ( length == 0 ) {
        length = message_length;
      }
      break;
    }
    length += message_length;
    offset += message_length;
  }
  return length;
}


static void
on_send_read( int fd, void *data ) {
  UNUSED( fd );

  char buf[ 256 ];
  send_queue *sq = ( send_queue* )data;

  if ( recv( sq->server_socket, buf, sizeof( buf ), 0 ) <= 0 ) {
    send_dump_message( MESSENGER_DUMP_SEND_CLOSED, sq->service_name, NULL, 0 );

    set_readable( sq->server_socket, false );
    set_writable( sq->server_socket, false );
    delete_fd_handler( sq->server_socket );

    close( sq->server_socket );
    sq->server_socket = -1;

    // Tries to reconnecting immediately, else adds a reconnect timer.
    if ( sq->buffer->data_length > 0 ) {
      send_queue_try_connect( sq );
    }
    else {
      delete_send_queue( sq );
    }
  }
}


static void
on_send_write( int fd, void *data ) {
  send_queue *sq = ( send_queue* )data;

  assert( sq != NULL );
  assert( fd >= 0 );

  debug( "Sending data to remote ( fd = %d, service_name = %s, buffer = %p, data_length = %u ).",
         fd, sq->service_name, get_message_buffer_head( sq->buffer ), sq->buffer->data_length );

  if ( sq->buffer->data_length < sizeof( message_header ) ) {
    set_writable( sq->server_socket, false );
    return;
  }

  void *send_data;
  size_t send_len;
  ssize_t sent_len;
  size_t sent_total = 0;

  while ( ( send_len = get_send_data( sq, sent_total ) ) > 0 ) {
    send_data = ( ( char * ) get_message_buffer_head( sq->buffer ) + sent_total );
    sent_len = send( fd, send_data, send_len, MSG_DONTWAIT );
    if ( sent_len == -1 ) {
      int err = errno;
      if ( err != EAGAIN && err != EWOULDBLOCK ) {
        error( "Failed to send ( service_name = %s, fd = %d, errno = %s [%d] ).",
               sq->service_name, fd, strerror( err ), err );
        send_dump_message( MESSENGER_DUMP_SEND_CLOSED, sq->service_name, NULL, 0 );

        set_readable( sq->server_socket, false );
        set_writable( sq->server_socket, false );
        delete_fd_handler( sq->server_socket );

        close( sq->server_socket );
        sq->server_socket = -1;
        sq->refused_count = 0;

        // Tries to reconnecting immediately, else adds a reconnect timer.
        send_queue_try_connect( sq );
      }
      truncate_message_buffer( sq->buffer, sent_total );
      if ( err == EMSGSIZE || err == ENOBUFS || err == ENOMEM ) {
        warn( "Dropping %u bytes data in send queue ( service_name = %s ).", sq->buffer->data_length, sq->service_name );
        truncate_message_buffer( sq->buffer, sq->buffer->data_length );
      }
      return;
    }
    assert( sent_len != 0 );
    assert( send_len == ( size_t ) sent_len );
    send_dump_message( MESSENGER_DUMP_SENT, sq->service_name, send_data, ( uint32_t ) sent_len );
    sent_total += ( size_t ) sent_len;
  }

  truncate_message_buffer( sq->buffer, sent_total );
  if ( sq->buffer->data_length == 0 ) {
    set_writable( sq->server_socket, false );
  }
}


int
flush_messenger() {
  int connected_count, sending_count, reconnecting_count, closed_count;

  debug( "Flushing send queues." );

  while ( true ) {
    number_of_send_queue( &connected_count, &sending_count, &reconnecting_count, &closed_count );
    if ( sending_count == 0 ) {
      return reconnecting_count;
    }
    run_event_handler_once( 100000 );
  }
  return 0;
}


bool
start_messenger() {
  debug( "Starting messenger." );

  add_periodic_event_callback( 10, age_context_db, NULL );

  return true;
}


bool
stop_messenger() {
  debug( "Terminating messenger." );

  return true;
}


void
start_messenger_dump( const char *dump_app_name, const char *dump_service_name ) {
  assert( dump_app_name != NULL );
  assert( dump_service_name != NULL );

  debug( "Starting a message dumper ( dump_app_name = %s, dump_service_name = %s ).",
         dump_app_name, dump_service_name );

  if ( messenger_dump_enabled() ) {
    stop_messenger_dump();
  }
  _dump_service_name = xstrdup( dump_service_name );
  _dump_app_name = xstrdup( dump_app_name );
}


void
stop_messenger_dump( void ) {
  assert( _dump_service_name != NULL );
  assert( _dump_app_name != NULL );

  debug( "Terminating a message dumper ( dump_app_name = %s, dump_service_name = %s ).",
         _dump_app_name, _dump_service_name );

  assert( send_queues != NULL );
  send_queue *sq = lookup_hash_entry( send_queues, _dump_service_name );
  if ( sq != NULL ) {
    delete_send_queue( sq );
  }

  xfree( _dump_service_name );
  _dump_service_name = NULL;

  xfree( _dump_app_name );
  _dump_app_name = NULL;
}


bool
messenger_dump_enabled( void ) {
  if ( _dump_service_name != NULL && _dump_app_name != NULL ) {
    return true;
  }

  return false;
}


/*
 * Local variables:
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * End:
 */