makeomatic/mservice

View on GitHub
packages/plugin-kafka/src/custom/consumer-stream.ts

Summary

Maintainability
D
2 days
Test Coverage

File consumer-stream.ts has 379 lines of code (exceeds 250 allowed). Consider refactoring.
Open

import { Readable } from 'stream'
import { strict as assert } from 'assert'
import { once } from 'events'
import { uniqWith, isEqual } from 'lodash'
import { promisify, delay } from 'bluebird'
Severity: Minor
Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 5 hrs to fix

    KafkaConsumerStream has 23 functions (exceeds 20 allowed). Consider refactoring.
    Open

    export class KafkaConsumerStream extends Readable {
      private static trackingKey(topicPart: Assignment): string {
        return `${topicPart.topic}_${topicPart.partition}`
      }
    
    
    Severity: Minor
    Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 2 hrs to fix

      Function readLoop has a Cognitive Complexity of 13 (exceeds 5 allowed). Consider refactoring.
      Open

        private async readLoop(): Promise<void> {
          while (!this.consumerDisconnected() && !this.endEmitted) {
            // when consumer disconnecting it throws Error: KafkaConsumer is not connected
            const bufferAvailable = this.readableHighWaterMark - this.readableLength
            const fetchSize = this.config.streamAsBatch ? this.fetchSize : bufferAvailable
      Severity: Minor
      Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 1 hr to fix

      Cognitive Complexity

      Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.

      A method's cognitive complexity is based on a few simple rules:

      • Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
      • Code is considered more complex for each "break in the linear flow of the code"
      • Code is considered more complex when "flow breaking structures are nested"

      Further reading

      Function commitMessages has a Cognitive Complexity of 12 (exceeds 5 allowed). Consider refactoring.
      Open

        public async commitMessages(messages: Message[]): Promise<void> {
          const offsets: TopicPartitionOffset[] = messages.map(m => ({
            topic: m.topic,
            partition: m.partition,
            offset: m.offset + 1,
      Severity: Minor
      Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 1 hr to fix

      Cognitive Complexity

      Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.

      A method's cognitive complexity is based on a few simple rules:

      • Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
      • Code is considered more complex for each "break in the linear flow of the code"
      • Code is considered more complex when "flow breaking structures are nested"

      Further reading

      Function constructor has 35 lines of code (exceeds 25 allowed). Consider refactoring.
      Open

        constructor(consumer: KafkaConsumer, config: ConsumerStreamOptions, log?: Logger) {
          assert(consumer.isConnected(), 'consumer should be connected')
          assert(consumer instanceof KafkaConsumer, 'should be intance of KafkaConsumer')
          const fetchSize = config.fetchSize || 1
          const highWaterMark = config.streamAsBatch ? 1 : fetchSize
      Severity: Minor
      Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 1 hr to fix

        Function handleIncomingMessages has 32 lines of code (exceeds 25 allowed). Consider refactoring.
        Open

          private handleIncomingMessages(messages: Message[]): void {
            const { unacknowledgedTracker, autoStore } = this
        
            if (!this.consumerDisconnected()) {
              this.consumer.pause(this.consumer.assignments())
        Severity: Minor
        Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 1 hr to fix

          Function handleOffsetCommit has a Cognitive Complexity of 10 (exceeds 5 allowed). Consider refactoring.
          Open

            private async handleOffsetCommit(err: LibrdKafkaError | null | undefined , partitions: TopicPartitionOffset[]): Promise<void> {
              if (err) {
                const wrappedError = new OffsetCommitError(partitions, this.trackerMeta, err)
          
                this.log?.warn({ err: wrappedError }, '[commit] offset commit error')
          Severity: Minor
          Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 1 hr to fix

          Cognitive Complexity

          Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.

          A method's cognitive complexity is based on a few simple rules:

          • Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
          • Code is considered more complex for each "break in the linear flow of the code"
          • Code is considered more complex when "flow breaking structures are nested"

          Further reading

          Function checkEof has a Cognitive Complexity of 10 (exceeds 5 allowed). Consider refactoring.
          Open

            private async checkEof(): Promise<void> {
              if (this.inDestroyingState() || !this.config.stopOnPartitionsEOF) {
                this.log?.debug('checkEof: destroying')
                return
              }
          Severity: Minor
          Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 1 hr to fix

          Cognitive Complexity

          Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.

          A method's cognitive complexity is based on a few simple rules:

          • Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
          • Code is considered more complex for each "break in the linear flow of the code"
          • Code is considered more complex when "flow breaking structures are nested"

          Further reading

          Function handleOffsetCommit has 27 lines of code (exceeds 25 allowed). Consider refactoring.
          Open

            private async handleOffsetCommit(err: LibrdKafkaError | null | undefined , partitions: TopicPartitionOffset[]): Promise<void> {
              if (err) {
                const wrappedError = new OffsetCommitError(partitions, this.trackerMeta, err)
          
                this.log?.warn({ err: wrappedError }, '[commit] offset commit error')
          Severity: Minor
          Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 1 hr to fix

            Function checkEof has 27 lines of code (exceeds 25 allowed). Consider refactoring.
            Open

              private async checkEof(): Promise<void> {
                if (this.inDestroyingState() || !this.config.stopOnPartitionsEOF) {
                  this.log?.debug('checkEof: destroying')
                  return
                }
            Severity: Minor
            Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 1 hr to fix

              Function handleIncomingMessages has a Cognitive Complexity of 9 (exceeds 5 allowed). Consider refactoring.
              Open

                private handleIncomingMessages(messages: Message[]): void {
                  const { unacknowledgedTracker, autoStore } = this
              
                  if (!this.consumerDisconnected()) {
                    this.consumer.pause(this.consumer.assignments())
              Severity: Minor
              Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 55 mins to fix

              Cognitive Complexity

              Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.

              A method's cognitive complexity is based on a few simple rules:

              • Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
              • Code is considered more complex for each "break in the linear flow of the code"
              • Code is considered more complex when "flow breaking structures are nested"

              Further reading

              Function updatePartitionOffsets has a Cognitive Complexity of 8 (exceeds 5 allowed). Consider refactoring.
              Open

                private updatePartitionOffsets(partitions: Assignment[], map: CommitOffsetTracker): void {
                  for (const topicPartition of partitions) {
                    const trackingKey = KafkaConsumerStream.trackingKey(topicPartition)
              
                    // if it has offset - verify that the current offset is smaller
              Severity: Minor
              Found in packages/plugin-kafka/src/custom/consumer-stream.ts - About 45 mins to fix

              Cognitive Complexity

              Cognitive Complexity is a measure of how difficult a unit of code is to intuitively understand. Unlike Cyclomatic Complexity, which determines how difficult your code will be to test, Cognitive Complexity tells you how difficult your code will be to read and comprehend.

              A method's cognitive complexity is based on a few simple rules:

              • Code is not considered more complex when it uses shorthand that the language provides for collapsing multiple statements into one
              • Code is considered more complex for each "break in the linear flow of the code"
              • Code is considered more complex when "flow breaking structures are nested"

              Further reading

              Similar blocks of code found in 2 locations. Consider refactoring.
              Open

                  if (uniqMessages.length != messages.length) {
                    this.log?.warn({ uniqLength: uniqMessages.length, origLength: messages.length }, '[dup] Duplicates received')
                  }
              Severity: Minor
              Found in packages/plugin-kafka/src/custom/consumer-stream.ts and 1 other location - About 55 mins to fix
              packages/plugin-kafka/src/custom/consumer-stream.ts on lines 341..343

              Duplicated Code

              Duplicated code can lead to software that is hard to understand and difficult to change. The Don't Repeat Yourself (DRY) principle states:

              Every piece of knowledge must have a single, unambiguous, authoritative representation within a system.

              When you violate DRY, bugs and maintenance problems are sure to follow. Duplicated code has a tendency to both continue to replicate and also to diverge (leaving bugs as two similar implementations differ in subtle ways).

              Tuning

              This issue has a mass of 54.

              We set useful threshold defaults for the languages we support but you may want to adjust these settings based on your project guidelines.

              The threshold configuration represents the minimum mass a code block must have to be analyzed for duplication. The lower the threshold, the more fine-grained the comparison.

              If the engine is too easily reporting duplication, try raising the threshold. If you suspect that the engine isn't catching enough duplication, try lowering the threshold. The best setting tends to differ from language to language.

              See codeclimate-duplication's documentation for more information about tuning the mass threshold in your .codeclimate.yml.

              Refactorings

              Further Reading

              Similar blocks of code found in 2 locations. Consider refactoring.
              Open

                  if (exceptPreviousOffset.length != uniqMessages.length) {
                    this.log?.warn({ filtered: exceptPreviousOffset.length, uniqMessages: uniqMessages.length }, '[dup] Previous offset data received')
                  }
              Severity: Minor
              Found in packages/plugin-kafka/src/custom/consumer-stream.ts and 1 other location - About 55 mins to fix
              packages/plugin-kafka/src/custom/consumer-stream.ts on lines 337..339

              Duplicated Code

              Duplicated code can lead to software that is hard to understand and difficult to change. The Don't Repeat Yourself (DRY) principle states:

              Every piece of knowledge must have a single, unambiguous, authoritative representation within a system.

              When you violate DRY, bugs and maintenance problems are sure to follow. Duplicated code has a tendency to both continue to replicate and also to diverge (leaving bugs as two similar implementations differ in subtle ways).

              Tuning

              This issue has a mass of 54.

              We set useful threshold defaults for the languages we support but you may want to adjust these settings based on your project guidelines.

              The threshold configuration represents the minimum mass a code block must have to be analyzed for duplication. The lower the threshold, the more fine-grained the comparison.

              If the engine is too easily reporting duplication, try raising the threshold. If you suspect that the engine isn't catching enough duplication, try lowering the threshold. The best setting tends to differ from language to language.

              See codeclimate-duplication's documentation for more information about tuning the mass threshold in your .codeclimate.yml.

              Refactorings

              Further Reading

              There are no issues that match your filters.

              Category
              Status