oleksiyk/kafka

View on GitHub
lib/protocol/offset_commit_fetch.js

Summary

Maintainability
D
1 day
Test Coverage
'use strict';

var Protocol = require('./index');
var globals  = require('./globals');

// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

/////////////////////////////
// OFFSET COMMIT/FETCH API //
/////////////////////////////

// group coordinator request and response are defined in group_membership.js

// v0
Protocol.define('OffsetCommitRequestV0PartitionItem', {
    write: function (data) { // { partition, offset, metadata }
        this
            .Int32BE(data.partition)
            .Int64BE(data.offset)
            .string(data.metadata);
    }
});

Protocol.define('OffsetCommitRequestV0TopicItem', {
    write: function (data) { // { topicName, partitions }
        this
            .string(data.topicName)
            .array(data.partitions, this.OffsetCommitRequestV0PartitionItem);
    }
});

Protocol.define('OffsetCommitRequestV0', {
    write: function (data) { // { groupId, topics }
        this
            .RequestHeader({
                apiKey: globals.API_KEYS.OffsetCommitRequest,
                apiVersion: 0,
                correlationId: data.correlationId,
                clientId: data.clientId
            })
            .string(data.groupId)
            .array(data.topics, this.OffsetCommitRequestV0TopicItem);
    }
});

// v1
/* istanbul ignore next */
Protocol.define('OffsetCommitRequestV1PartitionItem', {
    write: function (data) { // { partition, offset, timestamp, metadata }
        this
            .Int32BE(data.partition)
            .Int64BE(data.offset)
            .Int64BE(data.timestamp)
            .string(data.metadata);
    }
});

/* istanbul ignore next */
Protocol.define('OffsetCommitRequestV1TopicItem', {
    write: function (data) { // { topicName, partitions }
        this
            .string(data.topicName)
            .array(data.partitions, this.OffsetCommitRequestV1PartitionItem);
    }
});

/* istanbul ignore next */
Protocol.define('OffsetCommitRequestV1', {
    write: function (data) { // { groupId, generationId, memberId,  topics }
        this
            .RequestHeader({
                apiKey: globals.API_KEYS.OffsetCommitRequest,
                apiVersion: 1,
                correlationId: data.correlationId,
                clientId: data.clientId
            })
            .string(data.groupId)
            .string(data.generationId)
            .string(data.memberId)
            .array(data.topics, this.OffsetCommitRequestV1TopicItem);
    }
});

// v2
Protocol.define('OffsetCommitRequestV2', {
    write: function (data) { // { groupId, generationId, memberId, retentionTime, topics }
        this
            .RequestHeader({
                apiKey: globals.API_KEYS.OffsetCommitRequest,
                apiVersion: 2,
                correlationId: data.correlationId,
                clientId: data.clientId
            })
            .string(data.groupId)
            .Int32BE(data.generationId)
            .string(data.memberId)
            .Int64BE(data.retentionTime)
            .array(data.topics, this.OffsetCommitRequestV0TopicItem); // same as in v0
    }
});


Protocol.define('OffsetCommitResponseTopicItem', {
    read: function () {
        this
            .string('topicName')
            .array('partitions', this.OffsetCommitResponsePartitionItem);
    }
});

Protocol.define('OffsetCommitResponsePartitionItem', {
    read: function () {
        this
            .Int32BE('partition')
            .ErrorCode('error');
    }
});

// v0, v1 and v2
Protocol.define('OffsetCommitResponse', {
    read: function () {
        this
            .Int32BE('correlationId')
            .array('topics', this.OffsetCommitResponseTopicItem);
    }
});


Protocol.define('OffsetFetchRequestTopicItem', {
    write: function (data) { // { topicName, partitions }
        this
            .string(data.topicName)
            .array(data.partitions, this.Int32BE);
    }
});

Protocol.define('OffsetFetchRequest', {
    write: function (data) { // { apiVersion, groupId, topics }
        this
            .RequestHeader({
                apiKey: globals.API_KEYS.OffsetFetchRequest,
                apiVersion: data.apiVersion || 0,
                correlationId: data.correlationId,
                clientId: data.clientId
            })
            .string(data.groupId)
            .array(data.topics, this.OffsetFetchRequestTopicItem);
    }
});

Protocol.define('OffsetFetchResponseTopicItem', {
    read: function () {
        this
            .string('topicName')
            .array('partitions', this.OffsetFetchResponsePartitionItem);
    }
});

Protocol.define('OffsetFetchResponsePartitionItem', {
    read: function () {
        this
            .Int32BE('partition')
            .KafkaOffset('offset')
            .string('metadata')
            .ErrorCode('error');
    }
});

Protocol.define('OffsetFetchResponse', {
    read: function () {
        this
            .Int32BE('correlationId')
            .array('topics', this.OffsetFetchResponseTopicItem);
    }
});