File consumer-stream.ts
has 379 lines of code (exceeds 250 allowed). Consider refactoring. Open
import { Readable } from 'stream'
import { strict as assert } from 'assert'
import { once } from 'events'
import { uniqWith, isEqual } from 'lodash'
import { promisify, delay } from 'bluebird'
KafkaConsumerStream
has 23 functions (exceeds 20 allowed). Consider refactoring. Open
export class KafkaConsumerStream extends Readable {
private static trackingKey(topicPart: Assignment): string {
return `${topicPart.topic}_${topicPart.partition}`
}
Function readLoop
has a Cognitive Complexity of 13 (exceeds 5 allowed). Consider refactoring. Open
private async readLoop(): Promise<void> {
while (!this.consumerDisconnected() && !this.endEmitted) {
// when consumer disconnecting it throws Error: KafkaConsumer is not connected
const bufferAvailable = this.readableHighWaterMark - this.readableLength
const fetchSize = this.config.streamAsBatch ? this.fetchSize : bufferAvailable
- Read upRead up
Cognitive Complexity
Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.
A method's cognitive complexity is based on a few simple rules:
- Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
- Code is considered more complex for each "break in the linear flow of the code"
- Code is considered more complex when "flow breaking structures are nested"
Further reading
Function commitMessages
has a Cognitive Complexity of 12 (exceeds 5 allowed). Consider refactoring. Open
public async commitMessages(messages: Message[]): Promise<void> {
const offsets: TopicPartitionOffset[] = messages.map(m => ({
topic: m.topic,
partition: m.partition,
offset: m.offset + 1,
- Read upRead up
Cognitive Complexity
Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.
A method's cognitive complexity is based on a few simple rules:
- Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
- Code is considered more complex for each "break in the linear flow of the code"
- Code is considered more complex when "flow breaking structures are nested"
Further reading
Function constructor
has 35 lines of code (exceeds 25 allowed). Consider refactoring. Open
constructor(consumer: KafkaConsumer, config: ConsumerStreamOptions, log?: Logger) {
assert(consumer.isConnected(), 'consumer should be connected')
assert(consumer instanceof KafkaConsumer, 'should be intance of KafkaConsumer')
const fetchSize = config.fetchSize || 1
const highWaterMark = config.streamAsBatch ? 1 : fetchSize
Function handleIncomingMessages
has 32 lines of code (exceeds 25 allowed). Consider refactoring. Open
private handleIncomingMessages(messages: Message[]): void {
const { unacknowledgedTracker, autoStore } = this
if (!this.consumerDisconnected()) {
this.consumer.pause(this.consumer.assignments())
Function handleOffsetCommit
has a Cognitive Complexity of 10 (exceeds 5 allowed). Consider refactoring. Open
private async handleOffsetCommit(err: LibrdKafkaError | null | undefined , partitions: TopicPartitionOffset[]): Promise<void> {
if (err) {
const wrappedError = new OffsetCommitError(partitions, this.trackerMeta, err)
this.log?.warn({ err: wrappedError }, '[commit] offset commit error')
- Read upRead up
Cognitive Complexity
Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.
A method's cognitive complexity is based on a few simple rules:
- Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
- Code is considered more complex for each "break in the linear flow of the code"
- Code is considered more complex when "flow breaking structures are nested"
Further reading
Function checkEof
has a Cognitive Complexity of 10 (exceeds 5 allowed). Consider refactoring. Open
private async checkEof(): Promise<void> {
if (this.inDestroyingState() || !this.config.stopOnPartitionsEOF) {
this.log?.debug('checkEof: destroying')
return
}
- Read upRead up
Cognitive Complexity
Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.
A method's cognitive complexity is based on a few simple rules:
- Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
- Code is considered more complex for each "break in the linear flow of the code"
- Code is considered more complex when "flow breaking structures are nested"
Further reading
Function handleOffsetCommit
has 27 lines of code (exceeds 25 allowed). Consider refactoring. Open
private async handleOffsetCommit(err: LibrdKafkaError | null | undefined , partitions: TopicPartitionOffset[]): Promise<void> {
if (err) {
const wrappedError = new OffsetCommitError(partitions, this.trackerMeta, err)
this.log?.warn({ err: wrappedError }, '[commit] offset commit error')
Function checkEof
has 27 lines of code (exceeds 25 allowed). Consider refactoring. Open
private async checkEof(): Promise<void> {
if (this.inDestroyingState() || !this.config.stopOnPartitionsEOF) {
this.log?.debug('checkEof: destroying')
return
}
Function handleIncomingMessages
has a Cognitive Complexity of 9 (exceeds 5 allowed). Consider refactoring. Open
private handleIncomingMessages(messages: Message[]): void {
const { unacknowledgedTracker, autoStore } = this
if (!this.consumerDisconnected()) {
this.consumer.pause(this.consumer.assignments())
- Read upRead up
Cognitive Complexity
Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.
A method's cognitive complexity is based on a few simple rules:
- Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
- Code is considered more complex for each "break in the linear flow of the code"
- Code is considered more complex when "flow breaking structures are nested"
Further reading
Function updatePartitionOffsets
has a Cognitive Complexity of 8 (exceeds 5 allowed). Consider refactoring. Open
private updatePartitionOffsets(partitions: Assignment[], map: CommitOffsetTracker): void {
for (const topicPartition of partitions) {
const trackingKey = KafkaConsumerStream.trackingKey(topicPartition)
// if it has offset - verify that the current offset is smaller
- Read upRead up
Cognitive Complexity
Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.
A method's cognitive complexity is based on a few simple rules:
- Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
- Code is considered more complex for each "break in the linear flow of the code"
- Code is considered more complex when "flow breaking structures are nested"
Further reading
Similar blocks of code found in 2 locations. Consider refactoring. Open
if (uniqMessages.length != messages.length) {
this.log?.warn({ uniqLength: uniqMessages.length, origLength: messages.length }, '[dup] Duplicates received')
}
- Read upRead up
Duplicated Code
Duplicated code can lead to software that is hard to understand and difficult to change. The Don't Repeat Yourself (DRY) principle states:
Every piece of knowledge must have a single, unambiguous, authoritative representation within a system.
When you violate DRY, bugs and maintenance problems are sure to follow. Duplicated code has a tendency to both continue to replicate and also to diverge (leaving bugs as two similar implementations differ in subtle ways).
Tuning
This issue has a mass of 54.
We set useful threshold defaults for the languages we support but you may want to adjust these settings based on your project guidelines.
The threshold configuration represents the minimum mass a code block must have to be analyzed for duplication. The lower the threshold, the more fine-grained the comparison.
If the engine is too easily reporting duplication, try raising the threshold. If you suspect that the engine isn't catching enough duplication, try lowering the threshold. The best setting tends to differ from language to language.
See codeclimate-duplication
's documentation for more information about tuning the mass threshold in your .codeclimate.yml
.
Refactorings
- Extract Method
- Extract Class
- Form Template Method
- Introduce Null Object
- Pull Up Method
- Pull Up Field
- Substitute Algorithm
Further Reading
- Don't Repeat Yourself on the C2 Wiki
- Duplicated Code on SourceMaking
- Refactoring: Improving the Design of Existing Code by Martin Fowler. Duplicated Code, p76
Similar blocks of code found in 2 locations. Consider refactoring. Open
if (exceptPreviousOffset.length != uniqMessages.length) {
this.log?.warn({ filtered: exceptPreviousOffset.length, uniqMessages: uniqMessages.length }, '[dup] Previous offset data received')
}
- Read upRead up
Duplicated Code
Duplicated code can lead to software that is hard to understand and difficult to change. The Don't Repeat Yourself (DRY) principle states:
Every piece of knowledge must have a single, unambiguous, authoritative representation within a system.
When you violate DRY, bugs and maintenance problems are sure to follow. Duplicated code has a tendency to both continue to replicate and also to diverge (leaving bugs as two similar implementations differ in subtle ways).
Tuning
This issue has a mass of 54.
We set useful threshold defaults for the languages we support but you may want to adjust these settings based on your project guidelines.
The threshold configuration represents the minimum mass a code block must have to be analyzed for duplication. The lower the threshold, the more fine-grained the comparison.
If the engine is too easily reporting duplication, try raising the threshold. If you suspect that the engine isn't catching enough duplication, try lowering the threshold. The best setting tends to differ from language to language.
See codeclimate-duplication
's documentation for more information about tuning the mass threshold in your .codeclimate.yml
.
Refactorings
- Extract Method
- Extract Class
- Form Template Method
- Introduce Null Object
- Pull Up Method
- Pull Up Field
- Substitute Algorithm
Further Reading
- Don't Repeat Yourself on the C2 Wiki
- Duplicated Code on SourceMaking
- Refactoring: Improving the Design of Existing Code by Martin Fowler. Duplicated Code, p76