gstreamer/src/webrtc/mp_web/media.cc
#include <gst/gst.h>
#include <gst/sdp/sdp.h>
#include <gst/webrtc/webrtc.h>
/* For signalling */
#include <libsoup/soup.h>
#include <json-glib/json-glib.h>
#include <string.h>
#include "media.h"
static GMainLoop *loop;
// static const gchar *server_url = "wss://webrtc.nirbheek.in:8443";
static const gchar *server_url = "ws://172.16.64.58:8443";
static gboolean strict_ssl = FALSE;
// static GOptionEntry entries[] =
// {
// {"peer-id", 0, 0, G_OPTION_ARG_STRING, &peer_id, "String ID of the peer to connect to", "ID"},
// {"server", 0, 0, G_OPTION_ARG_STRING, &server_url, "Signalling server to connect to", "URL"},
// {NULL},
// };
static gboolean
cleanup_and_quit_loop(const gchar *msg, WebRTC *ep, enum AppState state)
{
if (msg)
g_printerr("%s\n", msg);
if (state > 0)
ep->app_state = state;
if (ep->ws_conn) {
if (soup_websocket_connection_get_state(ep->ws_conn) ==
SOUP_WEBSOCKET_STATE_OPEN)
/* This will call us again */
soup_websocket_connection_close(ep->ws_conn, 1000, "");
else
g_object_unref(ep->ws_conn);
}
if (loop) {
g_main_loop_quit(loop);
loop = NULL;
}
/* To allow usage as a GSourceFunc */
return G_SOURCE_REMOVE;
}
static gchar *
get_string_from_json_object(JsonObject *object)
{
JsonNode *root;
JsonGenerator *generator;
gchar *text;
/* Make it the root node */
root = json_node_init_object(json_node_alloc(), object);
generator = json_generator_new();
json_generator_set_root(generator, root);
text = json_generator_to_data(generator, NULL);
/* Release everything */
g_object_unref(generator);
json_node_free(root);
return text;
}
GstPadProbeReturn on_monitor_data(GstPad *pad, GstPadProbeInfo *info, gpointer rtspclient)
{
// if (!GST_BUFFER_FLAG_IS_SET(info->data, GST_BUFFER_FLAG_DELTA_UNIT)) {
// static int cnt = 0;
// GDateTime *date_time = g_date_time_new_now_local();
// gchar *s_date_time = g_date_time_format(date_time, "%H:%M:%S,%F");
// g_warning("Received keyframe(%u) @ (%s)", cnt++, s_date_time);
// g_free(s_date_time);
// g_date_time_unref(date_time);
// }
g_print("-");
return GST_PAD_PROBE_OK;
}
static void
on_incoming_stream(GstElement *webrtc_element, GstPad *new_pad, WebRTC *ep)
{
WebRTC *webrtc = ep;
GstElement *out = NULL;
GstPad *sink = NULL;
GstCaps *caps;
GstStructure *s;
const gchar *encoding_name;
if (GST_PAD_DIRECTION(new_pad) != GST_PAD_SRC)
return;
caps = gst_pad_get_current_caps(new_pad);
if (!caps)
caps = gst_pad_query_caps(new_pad, NULL);
s = gst_caps_get_structure(caps, 0);
encoding_name = gst_structure_get_string(s, "encoding-name");
#ifdef USE_VP8
if (g_strcmp0(encoding_name, "VP8") == 0) {
out = gst_parse_bin_from_description(
// "rtpvp8depay ! tee name=local_tee allow-not-linked=true ! queue ! vp8dec ! videoconvert ! ximagesink",
"rtpvp8depay ! tee name=local_tee allow-not-linked=true",
TRUE,
NULL);
#elif USE_H264
if (g_strcmp0(encoding_name, "H264") == 0) {
out = gst_parse_bin_from_description(
"rtph264depay ! tee name=local_tee allow-not-linked=true",
TRUE,
NULL);
#endif
g_warn_if_fail(gst_bin_add(GST_BIN(out), webrtc->video_input_joint_.upstream_joint));
GstElement *local_tee = gst_bin_get_by_name_recurse_up(GST_BIN(out), "local_tee");
g_warn_if_fail(gst_element_link(local_tee, webrtc->video_input_joint_.upstream_joint));
// GstPad *pad = gst_element_get_static_pad(webrtc->audio_input_joint_.upstream_joint, "sink");
// gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, on_monitor_data, NULL, NULL);
// gst_object_unref(pad);
gst_object_unref(local_tee);
} else if (g_strcmp0(encoding_name, "PCMA") == 0) {
printf("~~~~~~~~~~~~~~~~~pcma~~~~~~~~~~~\n");
out = gst_parse_bin_from_description(
"rtppcmadepay ! tee name=local_audio_tee allow-not-linked=true ! queue ! alawdec ! audioconvert ! spectrascope shader=3 ! ximagesink sync=false",
// "rtppcmadepay ! tee name=local_audio_tee allow-not-linked=true",
TRUE,
NULL);
g_warn_if_fail(gst_bin_add(GST_BIN(out), webrtc->audio_input_joint_.upstream_joint));
GstElement *local_tee = gst_bin_get_by_name_recurse_up(GST_BIN(out), "local_audio_tee");
g_warn_if_fail(gst_element_link(local_tee, webrtc->audio_input_joint_.upstream_joint));
GstPad *pad = gst_element_get_static_pad(webrtc->audio_input_joint_.upstream_joint, "sink");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, on_monitor_data, NULL, NULL);
gst_object_unref(pad);
gst_object_unref(local_tee);
} else {
g_critical("Unknown encoding name %s", encoding_name);
g_assert_not_reached();
}
gst_bin_add(GST_BIN(webrtc->pipe), out);
gst_element_sync_state_with_parent(out);
sink = (GstPad *)out->sinkpads->data;
gst_pad_link(new_pad, sink);
gst_caps_unref(caps);
}
static void
send_ice_candidate_message(GstElement *webrtc G_GNUC_UNUSED, guint mlineindex, gchar *candidate, WebRTC *ep)
{
gchar *text;
JsonObject *ice, *msg;
if (ep->app_state < PEER_CALL_NEGOTIATING) {
cleanup_and_quit_loop("Can't send ICE, not in call", ep, APP_STATE_ERROR);
return;
}
ice = json_object_new();
json_object_set_string_member(ice, "candidate", candidate);
json_object_set_int_member(ice, "sdpMLineIndex", mlineindex);
msg = json_object_new();
json_object_set_object_member(msg, "ice", ice);
text = get_string_from_json_object(msg);
json_object_unref(msg);
g_print("==========>send ice candidate:\n%s\n", text);
soup_websocket_connection_send_text(ep->ws_conn, text);
g_free(text);
}
static void
send_sdp_offer(GstWebRTCSessionDescription *offer, WebRTC *ep)
{
gchar *text;
JsonObject *msg, *sdp;
if (ep->app_state < PEER_CALL_NEGOTIATING) {
cleanup_and_quit_loop("Can't send offer, not in call", ep, APP_STATE_ERROR);
return;
}
text = gst_sdp_message_as_text(offer->sdp);
sdp = json_object_new();
json_object_set_string_member(sdp, "type", "offer");
json_object_set_string_member(sdp, "sdp", text);
g_free(text);
msg = json_object_new();
json_object_set_object_member(msg, "sdp", sdp);
text = get_string_from_json_object(msg);
json_object_unref(msg);
g_print("==========>Sending offer:\n%s\n", text);
soup_websocket_connection_send_text(ep->ws_conn, text);
g_free(text);
}
/* Offer created by our pipeline, to be sent to the peer */
static void
on_offer_created(GstPromise *promise, gpointer data)
{
WebRTC *ep = (WebRTC *)data;
g_print("==========>on_offer_created\n");
GstWebRTCSessionDescription *offer = NULL;
const GstStructure *reply;
g_assert_cmphex(ep->app_state, ==, PEER_CALL_NEGOTIATING);
g_assert_cmphex(gst_promise_wait(promise), ==, GST_PROMISE_RESULT_REPLIED);
reply = gst_promise_get_reply(promise);
gst_structure_get(reply, "offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
gst_promise_unref(promise);
gst_sdp_media_add_attribute((GstSDPMedia *)&g_array_index(offer->sdp->medias, GstSDPMedia, 0),
"fmtp",
"96 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f");
gchar *desc;
desc = gst_sdp_message_as_text(offer->sdp);
g_print("==========>Created offer:\n%s\n", desc);
g_free(desc);
promise = gst_promise_new();
g_signal_emit_by_name(ep->webrtc, "set-local-description", offer, promise);
gst_promise_interrupt(promise);
gst_promise_unref(promise);
/* Send offer to peer */
send_sdp_offer(offer, ep);
gst_webrtc_session_description_free(offer);
}
static void
on_negotiation_needed(GstElement *element, WebRTC *ep)
{
GstPromise *promise;
ep->app_state = PEER_CALL_NEGOTIATING;
promise = gst_promise_new_with_change_func(on_offer_created, ep, NULL);
g_signal_emit_by_name(ep->webrtc, "create-offer", NULL, promise);
}
#define STUN_SERVER " stun-server=stun://stun.l.google.com:19302 "
#define RTP_CAPS_AUDIO "application/x-rtp,media=audio,encoding-name=PCMA,payload="
#ifdef USE_VP8
#define RTP_CAPS_VIDEO "application/x-rtp,media=video,encoding-name=VP8,payload="
#elif USE_H264
#define RTP_CAPS_VIDEO "application/x-rtp,media=video,encoding-name=H264,payload="
#endif
static gboolean
start_pipeline(WebRTC *ep)
{
GstStateChangeReturn ret;
GError *error = NULL;
ep->pipe = gst_parse_launch(
"webrtcbin name=sendrecv "
// "videotestsrc pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay ! "
#ifdef USE_VP8
"queue name=video_joint ! rtpvp8pay ! " RTP_CAPS_VIDEO
#elif USE_H264
"queue name=video_joint ! rtph264pay config-interval=-1 ! " RTP_CAPS_VIDEO
#endif
"96 ! sendrecv. "
// "audiotestsrc wave=red-noise ! audioconvert ! audioresample ! queue ! alawenc ! rtppcmapay ! "
"queue name=audio_joint ! rtppcmapay ! " RTP_CAPS_AUDIO "8 ! sendrecv. ",
&error);
if (error) {
g_printerr("Failed to parse launch: %s\n", error->message);
g_error_free(error);
if (ep->pipe)
g_clear_object(&ep->pipe);
if (ep->webrtc)
ep->webrtc = NULL;
return FALSE;
}
ep->webrtc = gst_bin_get_by_name(GST_BIN(ep->pipe), "sendrecv");
g_assert_nonnull(ep->webrtc);
/* This is the gstwebrtc entry point where we create the offer and so on. It
* will be called when the pipeline goes to PLAYING. */
g_signal_connect(ep->webrtc, "on-negotiation-needed", G_CALLBACK(on_negotiation_needed), ep);
g_signal_connect(ep->webrtc, "on-ice-candidate", G_CALLBACK(send_ice_candidate_message), ep);
/* Incoming streams will be exposed via this signal */
g_signal_connect(ep->webrtc, "pad-added", G_CALLBACK(on_incoming_stream), ep);
/* Lifetime is the same as the pipeline itself */
gst_object_unref(ep->webrtc);
////////////////////////////////////////////////////////////////
static int session_count = 0;
// video
static std::string video_media_type = "video";
std::string video_output_pipejoint_name = std::string("webrtc_video_output_joint_") +
std::to_string(session_count);
ep->video_output_joint_ = make_pipe_joint(video_media_type, video_output_pipejoint_name);
g_warn_if_fail(gst_bin_add(GST_BIN(ep->pipe), ep->video_output_joint_.downstream_joint));
GstElement *video_joint = gst_bin_get_by_name_recurse_up(GST_BIN(ep->pipe), "video_joint");
g_warn_if_fail(gst_element_link(ep->video_output_joint_.downstream_joint, video_joint));
std::string video_input_pipejoint_name = std::string("webrtc_video_input_joint_") +
std::to_string(session_count);
ep->video_input_joint_ = make_pipe_joint(video_media_type, video_input_pipejoint_name);
// audio
static std::string audio_media_type = "audio";
std::string audio_output_pipejoint_name = std::string("webrtc_audio_output_joint_") +
std::to_string(session_count);
ep->audio_output_joint_ = make_pipe_joint(audio_media_type, audio_output_pipejoint_name);
g_warn_if_fail(gst_bin_add(GST_BIN(ep->pipe), ep->audio_output_joint_.downstream_joint));
GstElement *audio_joint = gst_bin_get_by_name_recurse_up(GST_BIN(ep->pipe), "audio_joint");
g_warn_if_fail(gst_element_link(ep->audio_output_joint_.downstream_joint, audio_joint));
std::string audio_input_pipejoint_name = std::string("webrtc_audio_input_joint_") +
std::to_string(session_count);
ep->audio_input_joint_ = make_pipe_joint(audio_media_type, audio_input_pipejoint_name);
session_count++;
////////////////////////////////////////////////////////////////
g_print("Starting pipeline\n");
ret = gst_element_set_state(GST_ELEMENT(ep->pipe), GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE)
goto err;
return TRUE;
err:
if (ep->pipe)
g_clear_object(&ep->pipe);
if (ep->webrtc)
ep->webrtc = NULL;
return FALSE;
}
static gboolean
setup_call(WebRTC *ep)
{
gchar *msg;
if (soup_websocket_connection_get_state(ep->ws_conn) !=
SOUP_WEBSOCKET_STATE_OPEN)
return FALSE;
if (!ep->peer_id)
return FALSE;
g_print("Setting up signalling server call with %s\n", ep->peer_id);
ep->app_state = PEER_CONNECTING;
msg = g_strdup_printf("SESSION %s", ep->peer_id);
soup_websocket_connection_send_text(ep->ws_conn, msg);
g_free(msg);
return TRUE;
}
static gboolean
register_with_server(WebRTC *ep)
{
gchar *hello;
gint32 our_id;
if (soup_websocket_connection_get_state(ep->ws_conn) !=
SOUP_WEBSOCKET_STATE_OPEN)
return FALSE;
our_id = g_random_int_range(10, 10000);
g_print("Registering id %i with server\n", our_id);
ep->app_state = SERVER_REGISTERING;
hello = g_strdup_printf("HELLO %i", our_id);
soup_websocket_connection_send_text(ep->ws_conn, hello);
g_free(hello);
return TRUE;
}
static void
on_server_closed(SoupWebsocketConnection *conn G_GNUC_UNUSED,
WebRTC *ep)
{
ep->app_state = SERVER_CLOSED;
cleanup_and_quit_loop("Server connection closed", ep, (AppState)0);
}
/* One mega message handler for our asynchronous calling mechanism */
static void
on_server_message(SoupWebsocketConnection *conn, SoupWebsocketDataType type, GBytes *message, WebRTC *ep)
{
printf("~~~~~~~~~~~~~~~~on_server_message, ep->peer_id:%s\n", ep->peer_id);
gsize size;
gchar *text, *data;
switch (type) {
case SOUP_WEBSOCKET_DATA_BINARY:
g_printerr("Received unknown binary message, ignoring\n");
g_bytes_unref(message);
return;
case SOUP_WEBSOCKET_DATA_TEXT:
data = (gchar *)g_bytes_unref_to_data(message, &size);
/* Convert to NULL-terminated string */
text = g_strndup(data, size);
g_free(data);
break;
default:
g_assert_not_reached();
}
/* Server has accepted our registration, we are ready to send commands */
if (g_strcmp0(text, "HELLO") == 0) {
if (ep->app_state != SERVER_REGISTERING) {
cleanup_and_quit_loop("ERROR: Received HELLO when not registering", ep, APP_STATE_ERROR);
goto out;
}
ep->app_state = SERVER_REGISTERED;
g_print("==========>Registered with server\n");
/* Ask signalling server to connect us with a specific peer */
if (!setup_call(ep)) {
cleanup_and_quit_loop("ERROR: Failed to setup call", ep, PEER_CALL_ERROR);
goto out;
}
/* Call has been setup by the server, now we can start negotiation */
} else if (g_strcmp0(text, "SESSION_OK") == 0) {
if (ep->app_state != PEER_CONNECTING) {
cleanup_and_quit_loop("ERROR: Received SESSION_OK when not calling", ep, PEER_CONNECTION_ERROR);
goto out;
}
g_print("==========>SESSION_OK start_pipeline!\n");
ep->app_state = PEER_CONNECTED;
/* Start negotiation (exchange SDP and ICE candidates) */
if (!start_pipeline(ep))
cleanup_and_quit_loop("ERROR: failed to start pipeline", ep, PEER_CALL_ERROR);
g_print("==========>SESSION_OK start_pipeline over!\n");
/* Handle errors */
} else if (g_str_has_prefix(text, "ERROR")) {
switch (ep->app_state) {
case SERVER_CONNECTING:
ep->app_state = SERVER_CONNECTION_ERROR;
break;
case SERVER_REGISTERING:
ep->app_state = SERVER_REGISTRATION_ERROR;
break;
case PEER_CONNECTING:
ep->app_state = PEER_CONNECTION_ERROR;
break;
case PEER_CONNECTED:
case PEER_CALL_NEGOTIATING:
ep->app_state = PEER_CALL_ERROR;
default:
ep->app_state = APP_STATE_ERROR;
}
cleanup_and_quit_loop(text, ep, (AppState)0);
/* Look for JSON messages containing SDP and ICE candidates */
} else {
g_print("\n=========>receive\n%s\n", text);
JsonNode *root;
JsonObject *object, *child;
JsonParser *parser = json_parser_new();
if (!json_parser_load_from_data(parser, text, -1, NULL)) {
g_printerr("Unknown message '%s', ignoring", text);
g_object_unref(parser);
goto out;
}
root = json_parser_get_root(parser);
if (!JSON_NODE_HOLDS_OBJECT(root)) {
g_printerr("Unknown json message '%s', ignoring", text);
g_object_unref(parser);
goto out;
}
object = json_node_get_object(root);
/* Check type of JSON message */
if (json_object_has_member(object, "sdp")) {
int ret;
GstSDPMessage *sdp;
const gchar *text, *sdptype;
GstWebRTCSessionDescription *answer;
g_assert_cmphex(ep->app_state, ==, PEER_CALL_NEGOTIATING);
child = json_object_get_object_member(object, "sdp");
if (!json_object_has_member(child, "type")) {
cleanup_and_quit_loop("ERROR: received SDP without 'type'", ep, PEER_CALL_ERROR);
goto out;
}
sdptype = json_object_get_string_member(child, "type");
/* In this example, we always create the offer and receive one answer.
* See tests/examples/webrtcbidirectional.c in gst-plugins-bad for how to
* handle offers from peers and reply with answers using webrtcbin. */
g_assert_cmpstr(sdptype, ==, "answer");
text = json_object_get_string_member(child, "sdp");
// g_print ("==========>Received answer:\n%s\n", text);
ret = gst_sdp_message_new(&sdp);
g_assert_cmphex(ret, ==, GST_SDP_OK);
ret = gst_sdp_message_parse_buffer((guint8 *)text, strlen(text), sdp);
g_assert_cmphex(ret, ==, GST_SDP_OK);
answer = gst_webrtc_session_description_new(GST_WEBRTC_SDP_TYPE_ANSWER,
sdp);
g_assert_nonnull(answer);
/* Set remote description on our pipeline */
{
GstPromise *promise = gst_promise_new();
g_signal_emit_by_name(ep->webrtc, "set-remote-description", answer, promise);
gst_promise_interrupt(promise);
gst_promise_unref(promise);
}
ep->app_state = PEER_CALL_STARTED;
} else if (json_object_has_member(object, "ice")) {
g_print("==========>Received ice\n");
const gchar *candidate;
gint sdpmlineindex;
child = json_object_get_object_member(object, "ice");
candidate = json_object_get_string_member(child, "candidate");
sdpmlineindex = json_object_get_int_member(child, "sdpMLineIndex");
/* Add ice candidate sent by remote peer */
g_signal_emit_by_name(ep->webrtc, "add-ice-candidate", sdpmlineindex, candidate);
} else {
g_printerr("Ignoring unknown JSON message:\n%s\n", text);
}
g_object_unref(parser);
}
out:
g_free(text);
}
static void
on_server_connected(SoupSession *session, GAsyncResult *res, WebRTC *ep)
{
printf("~~~~~~~~~~~~on_server_connected: peer_id %p , %s\n", ep, ep->peer_id);
GError *error = NULL;
ep->ws_conn = soup_session_websocket_connect_finish(session, res, &error);
if (error) {
cleanup_and_quit_loop(error->message, ep, SERVER_CONNECTION_ERROR);
g_error_free(error);
return;
}
g_assert_nonnull(ep->ws_conn);
ep->app_state = SERVER_CONNECTED;
g_print("Connected to signalling server\n");
g_signal_connect(ep->ws_conn, "closed", G_CALLBACK(on_server_closed), ep);
g_signal_connect(ep->ws_conn, "message", G_CALLBACK(on_server_message), ep);
/* Register with the server so it knows about us and can accept commands */
register_with_server(ep);
}
/*
* Connect to the signalling server. This is the entrypoint for everything else.
*/
static void
connect_to_websocket_server_async(WebRTC *ep)
{
SoupLogger *logger;
SoupMessage *message;
SoupSession *session;
const char *https_aliases[] = {"wss", NULL};
session = soup_session_new_with_options(SOUP_SESSION_SSL_STRICT, strict_ssl, SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
//SOUP_SESSION_SSL_CA_FILE, "/etc/ssl/certs/ca-bundle.crt",
SOUP_SESSION_HTTPS_ALIASES,
https_aliases,
NULL);
logger = soup_logger_new(SOUP_LOGGER_LOG_BODY, -1);
soup_session_add_feature(session, SOUP_SESSION_FEATURE(logger));
g_object_unref(logger);
message = soup_message_new(SOUP_METHOD_GET, server_url);
g_print("Connecting to server...\n");
/* Once connected, we will register */
printf("~~~~~~~~~~~~connect_to_websocket_server_async: peer_id %p , %s\n", ep, ep->peer_id);
soup_session_websocket_connect_async(session, message, NULL, NULL, NULL, (GAsyncReadyCallback)on_server_connected, ep);
ep->app_state = SERVER_CONNECTING;
}
/////////////////////////////////////////////////////////////////////////////////
WebRTC *create_ep(gchar *peer)
{
printf("~~~~~~~~~~~~~~~~~~peer: %s\n", peer);
WebRTC *ep = new WebRTC(peer);
connect_to_websocket_server_async(ep);
return ep;
}
// #define TEST
#ifdef TEST
int main(int argc, char *argv[])
{
gst_init(NULL, NULL);
create_ep("1234");
create_ep("1235");
loop = g_main_loop_new(NULL, FALSE);
g_main_loop_run(loop);
g_main_loop_unref(loop);
terminate();
return 0;
}
#endif