ホーム>source

私はKafka Streams&Kotlinと協力して、3つのトピックのストリームを持つサービスを開発しています。最初の1つはAvro値を持ち、他の2つは文字列値を持っています。

私の properties ファイル、私は SpecificAvroSerde を持っていますデフォルト値のSerdeとして、そして私は Consumed.with(Serdes.String(), Serdes.String()) を使用します文字列値を消費します。

<前>ウィズウィズ

値のデフォルトとして次のストリームの構成がある場合、Avroストリーム(最初のストリーム)は正常に動作しており、そのトピックで公開しているものを消費しています。しかし、同じ構成を使用して文字列値ストリームにパブリッシュすると、例外が発生します。

<前>ウィズウィズ

以下は、topicTwoおよびtopicThreeへのパブリッシュからの例外です。

<前>ウィズウィズ

PS。後で結合されるため、同じサービス内の3つのストリームである必要があります。

val topicOneStream = streamsBuilder.stream<String, AvroObject>(topicOne) .peek { k, _ -> logger.info("Received message with key: $k") } .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 } val topicTwoStream = streamsBuilder .stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String())) .peek { k, _ -> logger.info("Received message with key: $k") } .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 } val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String())) .peek { k, _ -> logger.info("Received message with key: $k") } .mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) } .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
あなたの答え
  • 解決した方法 # 1

    友人(Mario Boikov)のおかげで、カフカが新しい KTable を生成するためにグループ化を行うと問題が発生します。 。グループ化にどのシリアライザを使用するかはわからないため、値にはデフォルトのSerdeを使用します。これは、私の場合は SpecificAvroSerde です。

    グループ化に必要なシリアライザーをgroupByKeyに提供することで解決しました。

    <前>ウィズウィズ

    乾杯 val topicTwoStream = streamsBuilder .stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String())) .peek { k, _ -> logger.info("Received message with key: $k") } .flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 } val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String())) .peek { k, _ -> logger.info("Received message with key: $k") } .flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 } .mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }

  • 前へ java - JPAクエリ:サブクエリをグループ化条件に結合する
  • 次へ python - テンソルフロー予測ループ