ホーム>source

プロセッサapiを使用して、状態ストアからメッセージを削除しています。削除は正常に機能します。kafkaキーによる状態ストアのインタラクティブクエリ呼び出しを使用して確認しましたが、tmp/kafka-streamsディレクトリの下のローカルディスクのkafkaストリームファイルサイズは減少しません。

<前>ウィズウィズ

kafkaストリームディレクトリサイズ

<前>ウィズウィズ

ファイルサイズを制御できるように、特定の構成が必要ですか?このように動作しない場合、kafka-streamsディレクトリを削除しても大丈夫ですか?このような削除は、状態ストアと変更ログのトピックの両方からレコードを削除するため、安全であると思います。

@Override public void init(ProcessorContext processorContext) { this.processorContext = processorContext; processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() { @Override public void punctuate(long l) { processorContext.commit(); } }); //invoke punctuate every 12 seconds this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore()); log.info("Processor initialized"); } @Override public void process(String key, GenericRecord value) { statestore.all().forEachRemaining(keyValue -> { statestore.delete(keyValue.key); }); }
あなたの答え
  • 解決した方法 # 1

    RocksDBはバックグラウンドでファイルの圧縮を行います。したがって、より強力な圧縮が必要な場合は、カスタム RocksDBConfigSetter を渡す必要があります  Streams構成パラメーター rocksdb.config.setter 経由 。 RockDBの詳細については、RocksDBのドキュメントをご覧ください。

    https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter

    ただし、実際の問題がない限り、RocksDBの構成を変更することはお勧めしません。保存サイズがかなり小さいようですので、本当の問題はありません。

    ところで、あなたは state.dir をチャンスにするべきです  本番環境に移行し、状態をデフォルトの /tmp にしない場合の構成  ロケーション。

  • 前へ java - JPAクエリ:サブクエリをグループ化条件に結合する
  • 次へ 画面が反応ネイティブで読み込まれているときに、ボタンにアニメーションを表示するにはどうすればよいですか?