プロセッサapiを使用して、状態ストアからメッセージを削除しています。削除は正常に機能します。kafkaキーによる状態ストアのインタラクティブクエリ呼び出しを使用して確認しましたが、tmp/kafka-streamsディレクトリの下のローカルディスクのkafkaストリームファイルサイズは減少しません。
<前>ウィズウィズkafkaストリームディレクトリサイズ
<前>ウィズウィズファイルサイズを制御できるように、特定の構成が必要ですか?このように動作しない場合、kafka-streamsディレクトリを削除しても大丈夫ですか?このような削除は、状態ストアと変更ログのトピックの両方からレコードを削除するため、安全であると思います。
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long l) {
processorContext.commit();
}
}); //invoke punctuate every 12 seconds
this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
log.info("Processor initialized");
}
@Override
public void process(String key, GenericRecord value) {
statestore.all().forEachRemaining(keyValue -> {
statestore.delete(keyValue.key);
});
}
- 解決した方法 # 1
関連した質問
- SpringとKafka:Kafka ProducerとKafka Streamsに同じバインディングを使用する
- java - Kafkaストリーム、複数のトピックへの分岐出力
- カフカINVALID_FETCH_SESSION_EPOCH
- java - Kafkaストリームエラー:SerializationException:LongDeserializerが受信したデータのサイズが8ではありません
- Kafka Streams - カフカストリーム:作成されていないストア
- KafkaストリーミングDSL機能を使用して重複メッセージを処理する方法
- java - Kafka Streams Partition Assignorを理解する
- priority queue - Apache Kafka 20でメッセージに優先順位を付ける方法はありますか?
- kotlin - Kafka Streams StreamsExceptionを複数のストリームで使用すると
RocksDBはバックグラウンドでファイルの圧縮を行います。したがって、より強力な圧縮が必要な場合は、カスタム
RocksDBConfigSetter
を渡す必要があります Streams構成パラメーターrocksdb.config.setter
経由 。 RockDBの詳細については、RocksDBのドキュメントをご覧ください。https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter
ただし、実際の問題がない限り、RocksDBの構成を変更することはお勧めしません。保存サイズがかなり小さいようですので、本当の問題はありません。
ところで、あなたは
state.dir
をチャンスにするべきです 本番環境に移行し、状態をデフォルトの/tmp
にしない場合の構成 ロケーション。