source
                .map(({message}) => message.value.toString('utf8'))
                .subscribe(
                    message => {
                        try { observer.next(mapper(message)); }