func KafkaLocalConsumer(ctx context.Context, topic string, oldoffset int64) (data []byte, offset int64, err error) {

    ctx, cancel := context.WithCancel(ctx)
    defer cancel()