yjjnls/Notes

View on GitHub
gstreamer/src/webrtc/mp_web/mp_web.cc

Summary

Maintainability
Test Coverage
#ifndef _WIN32
#include <unistd.h>
#endif

#include <gst/gst.h>
#include <gst/sdp/sdp.h>

#include <gst/webrtc/webrtc.h>

#include <libsoup/soup.h>
#include <json-glib/json-glib.h>

#include <vector>
#include <list>
#include <string>
#include <mutex>
#include <algorithm>
#include "media.h"


#define RTP_CAPS_AUDIO "application/x-rtp,media=audio,encoding-name=PCMA,payload="
#ifdef USE_VP8
#define RTP_CAPS_VIDEO "application/x-rtp,media=video,encoding-name=VP8,payload="
#elif USE_H264
#define RTP_CAPS_VIDEO "application/x-rtp,media=video,encoding-name=H264,payload="
#endif

// static std::string pattern[] = {"smpte", "ball", "snow", "blink", "circular", "pinwheel", "spokes"};
static std::string pattern[] = {"white", "green", "blue", "blink", "snow", "red", "circular", "pinwheel", "spokes"};
static std::string freq[] = {"500", "800", "2000", "10000", "30000", "40000", "50000"};

struct sink_link
{
    GstElement *joint;
    GstPad *request_pad;
    void *pipeline;
    gboolean video_probe_invoke_control;
    gboolean audio_probe_invoke_control;
    bool is_output;

    sink_link(GstPad *pad, GstElement *joint_element, void *pipe, bool output = true)
        : joint(joint_element)
        , request_pad(pad)
        , pipeline(pipe)
        , video_probe_invoke_control(FALSE)
        , audio_probe_invoke_control(FALSE)
        , is_output(output)
    {
    }
};
/*---------------------------------------------------------------------------------------------*/
class MultiPoints
{
 public:
    MultiPoints()
        : video_tee_(NULL)
        , audio_tee_(NULL)
        , video_selector_(NULL)
        , audio_selector_(NULL)
        , default_video_src_(NULL)
        , default_audio_src_(NULL)
        , main_pipeline_(NULL)
        , speaker_(NULL)
    {
#ifdef USE_VP8
        std::string launch = "videotestsrc ! timeoverlay valignment=3 halignment=4 time-mode=2 xpos=0 ypos=0 color=4278190080 font-desc=\"Sans 48\" draw-shadow=false draw-outline=true outline-color=4278190080 ! vp8enc ! rtpvp8pay ! rtpvp8depay name=default_video_src input-selector name=video-input-selector ! tee name=video-tee allow-not-linked=true  audiotestsrc ! alawenc ! rtppcmapay ! rtppcmadepay name=default_audio_src input-selector name=audio-input-selector ! tee name=audio-tee allow-not-linked=true";
#elif USE_H264
        std::string launch = "videotestsrc ! timeoverlay valignment=3 halignment=4 time-mode=2 xpos=0 ypos=0 color=4278190080 font-desc=\"Sans 48\" draw-shadow=false draw-outline=true outline-color=4278190080 ! x264enc ! rtph264pay config-interval=-1 ! rtph264depay name=default_video_src input-selector name=video-input-selector ! tee name=video-tee allow-not-linked=true  audiotestsrc ! alawenc ! rtppcmapay ! rtppcmadepay name=default_audio_src input-selector name=audio-input-selector ! tee name=audio-tee allow-not-linked=true";
#endif
        GError *error = NULL;

        main_pipeline_ = gst_parse_launch(launch.c_str(), &error);

        if (error) {
            g_printerr("Failed to parse launch: %s\n", error->message);
            g_error_free(error);
            return;
        }

        video_tee_ = gst_bin_get_by_name(GST_BIN(main_pipeline_), "video-tee");
        video_selector_ = gst_bin_get_by_name(GST_BIN(main_pipeline_), "video-input-selector");
        audio_tee_ = gst_bin_get_by_name(GST_BIN(main_pipeline_), "audio-tee");
        audio_selector_ = gst_bin_get_by_name(GST_BIN(main_pipeline_), "audio-input-selector");

        GstPad *pad = gst_element_get_static_pad(video_selector_, "src");
        gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, on_monitor_data, NULL, NULL);
        gst_object_unref(pad);

        default_video_src_ = gst_bin_get_by_name(GST_BIN(main_pipeline_), "default_video_src");
        // g_warn_if_fail(gst_element_link(default_video_src_, video_selector_));
        default_audio_src_ = gst_bin_get_by_name(GST_BIN(main_pipeline_), "default_audio_src");
        // g_warn_if_fail(gst_element_link(default_audio_src_, audio_selector_));

        gst_element_set_state(main_pipeline_, GST_STATE_PLAYING);
    }
    ~MultiPoints()
    {
        if (speaker_) {
            // video
            // g_warn_if_fail(gst_element_link(default_video_src_, video_selector_));
            GstPad *video_output_src_pad = gst_element_get_static_pad(default_video_src_, "src");
            GstPad *video_selector_sink_pad = gst_pad_get_peer(video_output_src_pad);
            g_object_set(G_OBJECT(video_selector_), "active-pad", video_selector_sink_pad, NULL);
            gst_object_unref(video_output_src_pad);
            // audio
            // g_warn_if_fail(gst_element_link(default_audio_src_, audio_selector_));
            GstPad *audio_output_src_pad = gst_element_get_static_pad(default_audio_src_, "src");
            GstPad *audio_selector_sink_pad = gst_pad_get_peer(audio_output_src_pad);
            g_object_set(G_OBJECT(audio_selector_), "active-pad", audio_selector_sink_pad, NULL);
            gst_object_unref(audio_output_src_pad);
            usleep(1 * 1000000);
        }
        for (WebRTC *ep : members_) {
            remove_stream_output_joint(ep->video_output_pipejoint());
            remove_stream_input_joint(ep->video_input_pipejoint());
            remove_stream_output_joint(ep->audio_output_pipejoint());
            remove_stream_input_joint(ep->audio_input_pipejoint());
            delete ep;
        }
        members_.clear();

        if (main_pipeline_) {
            gst_element_set_state(main_pipeline_, GST_STATE_NULL);
            g_object_unref(main_pipeline_);
            main_pipeline_ = NULL;
        }
        gst_object_unref(video_tee_);
        gst_object_unref(video_selector_);
        gst_object_unref(default_video_src_);
        gst_object_unref(audio_tee_);
        gst_object_unref(audio_selector_);
        gst_object_unref(default_audio_src_);
    }
    void generate_webrtc_enpoint(int num)
    {
        int id = 1234;
        for (int i = 0; i < num; ++i) {
            WebRTC *ep = create_ep((gchar *)std::to_string(id++).c_str());
            members_.push_back(ep);
        }
    }
    void join_room_all()
    {
        for (WebRTC *ep : members_) {
            add_member(ep);
        }
    }

    void add_member(WebRTC *ep)
    {
        GstElement *video_downstream_joint = ep->video_input_pipejoint();
        link_stream_input_joint(video_downstream_joint);
        GstElement *video_upstream_joint = ep->video_output_pipejoint();
        link_stream_output_joint(video_upstream_joint);
        GstElement *audio_downstream_joint = ep->audio_input_pipejoint();
        link_stream_input_joint(audio_downstream_joint);
        GstElement *audio_upstream_joint = ep->audio_output_pipejoint();
        link_stream_output_joint(audio_upstream_joint);

        if (speaker_ == NULL) {
            speaker_ = ep;
        }
    }
    void remove_member(WebRTC *ep)
    {
        GstElement *upstream_joint = ep->video_output_pipejoint();
        remove_stream_output_joint(upstream_joint);
        auto it = std::find(members_.begin(), members_.end(), ep);
        if (it != members_.end()) {
            members_.erase(it);
            printf("remove audience successfully!\n");
        }

        GstElement *downstream_joint = ep->video_input_pipejoint();
        remove_stream_input_joint(downstream_joint);
    }

    void link_stream_output_joint(GstElement *upstream_joint)
    {
        g_assert_nonnull(upstream_joint);
        std::lock_guard<std::mutex> lck(tee_mutex_);

        gchar *media_type = (gchar *)g_object_get_data(G_OBJECT(upstream_joint), "media-type");
        g_assert_nonnull(media_type);
        if (g_str_equal(media_type, "video")) {
            GstPadTemplate *templ = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(video_tee_), "src_%u");
            GstPad *pad = gst_element_request_pad(video_tee_, templ, NULL, NULL);
            sink_link *info = new sink_link(pad, upstream_joint, this);

            g_warn_if_fail(gst_bin_add(GST_BIN(main_pipeline_), upstream_joint));
            gst_element_sync_state_with_parent(upstream_joint);

            GstPad *sinkpad = gst_element_get_static_pad(upstream_joint, "sink");
            GstPadLinkReturn ret = gst_pad_link(pad, sinkpad);
            g_warn_if_fail(ret == GST_PAD_LINK_OK);
            gst_object_unref(sinkpad);

            tee_sinks_.push_back(info);
        } else if (g_str_equal(media_type, "audio")) {
            GstPadTemplate *templ = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(audio_tee_), "src_%u");
            GstPad *pad = gst_element_request_pad(audio_tee_, templ, NULL, NULL);
            sink_link *info = new sink_link(pad, upstream_joint, this);

            g_warn_if_fail(gst_bin_add(GST_BIN(main_pipeline_), upstream_joint));
            gst_element_sync_state_with_parent(upstream_joint);

            GstPad *sinkpad = gst_element_get_static_pad(upstream_joint, "sink");
            GstPadLinkReturn ret = gst_pad_link(pad, sinkpad);
            g_warn_if_fail(ret == GST_PAD_LINK_OK);
            gst_object_unref(sinkpad);

            tee_sinks_.push_back(info);
            printf("~~~~~~~~~tee ---> audio\n");
        }
    }

    void remove_stream_output_joint(GstElement *upstream_joint)
    {
        std::lock_guard<std::mutex> lck(tee_mutex_);


        auto it = tee_sinks_.begin();
        for (; it != tee_sinks_.end(); ++it) {
            if ((*it)->joint == upstream_joint) {
                break;
            }
        }
        if (it == tee_sinks_.end()) {
            g_warn_if_reached();
            // TODO(yuanjunjie) notify application
            return;
        }


        gchar *media_type = (gchar *)g_object_get_data(G_OBJECT(upstream_joint), "media-type");
        if (g_str_equal(media_type, "video")) {
            (*it)->video_probe_invoke_control = TRUE;
            gst_pad_add_probe((*it)->request_pad, GST_PAD_PROBE_TYPE_IDLE, on_request_pad_remove_video_probe, *it, NULL);
        } else if (g_str_equal(media_type, "audio")) {
            (*it)->audio_probe_invoke_control = TRUE;
            gst_pad_add_probe((*it)->request_pad, GST_PAD_PROBE_TYPE_IDLE, on_request_pad_remove_audio_probe, *it, NULL);
        }
        tee_sinks_.erase(it);
    }

    void link_stream_input_joint(GstElement *downstream_joint)
    {
        g_assert_nonnull(downstream_joint);
        std::lock_guard<std::mutex> lck(selector_mutex_);

        gchar *media_type = (gchar *)g_object_get_data(G_OBJECT(downstream_joint), "media-type");
        g_assert_nonnull(media_type);

        if (g_str_equal(media_type, "video")) {
            GstPadTemplate *templ = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(video_selector_), "sink_%u");
            GstPad *pad = gst_element_request_pad(video_selector_, templ, NULL, NULL);
            sink_link *info = new sink_link(pad, downstream_joint, this, false);

            g_warn_if_fail(gst_bin_add(GST_BIN(main_pipeline_), downstream_joint));
            gst_element_sync_state_with_parent(downstream_joint);


            GstPad *srcpad = gst_element_get_static_pad(downstream_joint, "src");

            // GstPad *testpad = gst_element_get_static_pad(downstream_joint, "src");
            // gst_pad_add_probe(testpad, GST_PAD_PROBE_TYPE_BUFFER, on_monitor_data, NULL, NULL);
            // gst_object_unref(testpad);

            GstPadLinkReturn ret = gst_pad_link(srcpad, pad);
            g_warn_if_fail(ret == GST_PAD_LINK_OK);
            gst_object_unref(srcpad);

            selector_sinks_.push_back(info);
        } else if (g_str_equal(media_type, "audio")) {
            GstPadTemplate *templ = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(audio_selector_), "sink_%u");
            GstPad *pad = gst_element_request_pad(audio_selector_, templ, NULL, NULL);

            sink_link *info = new sink_link(pad, downstream_joint, this, false);

            g_warn_if_fail(gst_bin_add(GST_BIN(main_pipeline_), downstream_joint));
            gst_element_sync_state_with_parent(downstream_joint);

            GstPad *srcpad = gst_element_get_static_pad(downstream_joint, "src");

            // GstPad *testpad = gst_element_get_static_pad(downstream_joint, "src");
            // gst_pad_add_probe(testpad, GST_PAD_PROBE_TYPE_BUFFER, on_monitor_data, NULL, NULL);
            // gst_object_unref(testpad);

            GstPadLinkReturn ret = gst_pad_link(srcpad, pad);
            g_warn_if_fail(ret == GST_PAD_LINK_OK);
            gst_object_unref(srcpad);

            selector_sinks_.push_back(info);
            printf("~~~~~~~~~audio ---> selector\n");
        }
    }
    void remove_stream_input_joint(GstElement *downstream_joint)
    {
        std::lock_guard<std::mutex> lck(selector_mutex_);

        auto it = selector_sinks_.begin();
        for (; it != selector_sinks_.end(); ++it) {
            if ((*it)->joint == downstream_joint) {
                break;
            }
        }
        if (it == selector_sinks_.end()) {
            g_warn_if_reached();
            // TODO(yuanjunjie) notify application
            return;
        }

        gchar *media_type = (gchar *)g_object_get_data(G_OBJECT(downstream_joint), "media-type");
        if (g_str_equal(media_type, "video")) {
            (*it)->video_probe_invoke_control = TRUE;
            gst_pad_add_probe((*it)->request_pad, GST_PAD_PROBE_TYPE_IDLE, on_request_pad_remove_video_probe, *it, NULL);
        } else if (g_str_equal(media_type, "audio")) {
            (*it)->audio_probe_invoke_control = TRUE;
            gst_pad_add_probe((*it)->request_pad, GST_PAD_PROBE_TYPE_IDLE, on_request_pad_remove_audio_probe, *it, NULL);
        }
        selector_sinks_.erase(it);
    }
    void set_speaker(WebRTC *ep)
    {
        // video
        GstElement *video_downstream_joint = ep->video_input_pipejoint();
        GstPad *video_output_src_pad = gst_element_get_static_pad(video_downstream_joint, "src");
        GstPad *video_selector_sink_pad = gst_pad_get_peer(video_output_src_pad);
        g_object_set(G_OBJECT(video_selector_), "active-pad", video_selector_sink_pad, NULL);

        gst_pad_send_event(video_output_src_pad,
                           gst_event_new_custom(GST_EVENT_CUSTOM_UPSTREAM,
                                                gst_structure_new("GstForceKeyUnit",
                                                                  "all-headers",
                                                                  G_TYPE_BOOLEAN,
                                                                  TRUE,
                                                                  NULL)));

        gst_object_unref(video_output_src_pad);
        // audio
        GstElement *audio_downstream_joint = ep->audio_input_pipejoint();
        GstPad *audio_output_src_pad = gst_element_get_static_pad(audio_downstream_joint, "src");
        GstPad *audio_selector_sink_pad = gst_pad_get_peer(audio_output_src_pad);
        g_object_set(G_OBJECT(audio_selector_), "active-pad", audio_selector_sink_pad, NULL);

        // gst_pad_send_event(audio_output_src_pad,
        //                    gst_event_new_custom(GST_EVENT_CUSTOM_UPSTREAM,
        //                                         gst_structure_new("GstForceKeyUnit",
        //                                                           "all-headers",
        //                                                           G_TYPE_BOOLEAN,
        //                                                           TRUE,
        //                                                           NULL)));

        gst_object_unref(audio_output_src_pad);

        speaker_ = ep;
    }
    // for test
    void change_speaker()
    {
        static int member = 0;
        int current = (member++) % members_.size();
        auto it = members_.begin();
        for (int i = 0; i < current; ++i) {
            ++it;
        }
        set_speaker(*it);
    }
    static GstPadProbeReturn
    on_request_pad_remove_video_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer data);
    static GstPadProbeReturn
    on_request_pad_remove_audio_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer data);

    static GstPadProbeReturn
    on_monitor_data(GstPad *pad, GstPadProbeInfo *info, gpointer data);

 private:
    GstElement *video_tee_;
    GstElement *audio_tee_;
    GstElement *video_selector_;
    GstElement *audio_selector_;
    GstElement *default_video_src_;
    GstElement *default_audio_src_;

    GstElement *main_pipeline_;
    std::list<WebRTC *> members_;
    WebRTC *speaker_;

    std::list<sink_link *> tee_sinks_;
    std::list<sink_link *> selector_sinks_;
    static std::mutex tee_mutex_;
    static std::mutex selector_mutex_;
};
std::mutex MultiPoints::tee_mutex_;
std::mutex MultiPoints::selector_mutex_;

GstPadProbeReturn MultiPoints::on_monitor_data(GstPad *pad,
                                               GstPadProbeInfo *info,
                                               gpointer data)
{
    if (!GST_BUFFER_FLAG_IS_SET(info->data, GST_BUFFER_FLAG_DELTA_UNIT)) {
        static int cnt = 0;
        GDateTime *date_time = g_date_time_new_now_local();
        gchar *s_date_time = g_date_time_format(date_time, "%H:%M:%S,%F");
        g_warning("Received keyframe(%u) @ (%s)", cnt++, s_date_time);
        g_free(s_date_time);
        g_date_time_unref(date_time);
    }
    g_print(".");
    return GST_PAD_PROBE_OK;
}

GstPadProbeReturn MultiPoints::on_request_pad_remove_video_probe(GstPad *teepad, GstPadProbeInfo *probe_info, gpointer data)
{
    sink_link *info = static_cast<sink_link *>(data);
    if (!g_atomic_int_compare_and_exchange(&info->video_probe_invoke_control, TRUE, FALSE)) {
        return GST_PAD_PROBE_OK;
    }

    GstElement *joint = info->joint;
    MultiPoints *pipeline = static_cast<MultiPoints *>(info->pipeline);

    // remove pipeline dynamicaly
    if (info->is_output) {
        GstPad *sinkpad = gst_element_get_static_pad(joint, "sink");
        gst_pad_unlink(info->request_pad, sinkpad);
        gst_object_unref(sinkpad);
        gst_element_set_state(joint, GST_STATE_NULL);
        g_warn_if_fail(gst_bin_remove(GST_BIN(pipeline->main_pipeline_), joint));
        gst_element_release_request_pad(pipeline->video_tee_, info->request_pad);
        gst_object_unref(info->request_pad);
        printf("~~~~~~~remove video tee pad\n");
    } else {
        GstPad *srckpad = gst_element_get_static_pad(joint, "src");
        gst_pad_unlink(srckpad, info->request_pad);
        gst_object_unref(srckpad);
        gst_element_set_state(joint, GST_STATE_NULL);
        g_warn_if_fail(gst_bin_remove(GST_BIN(pipeline->main_pipeline_), joint));
        gst_element_release_request_pad(pipeline->video_selector_, info->request_pad);
        gst_object_unref(info->request_pad);
        printf("~~~~~~~remove video selector pad\n");
    }
    delete static_cast<sink_link *>(data);
    return GST_PAD_PROBE_REMOVE;
}
GstPadProbeReturn MultiPoints::on_request_pad_remove_audio_probe(GstPad *teepad, GstPadProbeInfo *probe_info, gpointer data)
{
    sink_link *info = static_cast<sink_link *>(data);
    if (!g_atomic_int_compare_and_exchange(&info->audio_probe_invoke_control, TRUE, FALSE)) {
        return GST_PAD_PROBE_OK;
    }

    GstElement *joint = info->joint;
    MultiPoints *pipeline = static_cast<MultiPoints *>(info->pipeline);

    // remove pipeline dynamicaly
    if (info->is_output) {
        GstPad *sinkpad = gst_element_get_static_pad(joint, "sink");
        gst_pad_unlink(info->request_pad, sinkpad);
        gst_object_unref(sinkpad);
        gst_element_set_state(joint, GST_STATE_NULL);
        g_warn_if_fail(gst_bin_remove(GST_BIN(pipeline->main_pipeline_), joint));
        gst_element_release_request_pad(pipeline->audio_tee_, info->request_pad);
        gst_object_unref(info->request_pad);
        printf("~~~~~~~remove audio tee pad\n");
    } else {
        GstPad *srckpad = gst_element_get_static_pad(joint, "src");
        gst_pad_unlink(srckpad, info->request_pad);
        gst_object_unref(srckpad);
        gst_element_set_state(joint, GST_STATE_NULL);
        g_warn_if_fail(gst_bin_remove(GST_BIN(pipeline->main_pipeline_), joint));
        gst_element_release_request_pad(pipeline->audio_selector_, info->request_pad);
        gst_object_unref(info->request_pad);
        printf("~~~~~~~remove audio selector pad\n");
    }
    delete static_cast<sink_link *>(data);
    return GST_PAD_PROBE_REMOVE;
}
/*---------------------------------------------------------------------------------------------*/
static GMainLoop *main_loop = NULL;
static GThread *main_thread = NULL;
static GMainContext *main_context = NULL;
MultiPoints *room = NULL;
GSource *test_source = NULL;

static gboolean test(gpointer data)
{
    MultiPoints *room = static_cast<MultiPoints *>(data);
    printf("\nchange speaker\n");
    room->change_speaker();
    return TRUE;
}
void start_test()
{
    room->join_room_all();
    usleep(1 * 1000000);
    test(room);

    test_source = g_timeout_source_new_seconds(10);
    g_source_set_callback(test_source, (GSourceFunc)test, room, NULL);
    guint id = g_source_attach(test_source, main_context);
}
void MainLoop()
{
    main_context = g_main_context_new();
    main_loop = g_main_loop_new(main_context, FALSE);
    g_main_context_push_thread_default(main_context);

    room = new MultiPoints();
    room->generate_webrtc_enpoint(2);

    g_main_loop_run(main_loop);
    printf("main loop quit\n");

    g_source_destroy(test_source);
    test_source = NULL;
    delete room;
    room = NULL;

    g_main_context_pop_thread_default(main_context);

    g_main_loop_unref(main_loop);

    gst_deinit();
    main_loop = NULL;
    main_context = NULL;
}

int main(int argc, char *argv[])
{
    gst_init(NULL, NULL);
    main_thread = g_thread_new("mp_main_loop",
                               (GThreadFunc)MainLoop,
                               NULL);


    char ret = 0;
    do {
        ret = getchar();
        if (ret == 't')
            start_test();
    } while (ret != 'q');

    g_main_loop_quit(main_loop);
    g_thread_join(main_thread);
    return 0;
}