source
                .let(kafkaMessage())
                .map(message => {
                    try { return JSON.parse(message); }
                    catch(err) { observer.error(err); }