ホーム>source

以下のCassandraテーブルを見つけてください。

1つのCassandraテーブルから同じ構造の別のCassandraテーブルにデータをコピーしようとしています。

私を助けてください。

CREATE TABLE data2 (
        d_no text,
        d_type text,
        sn_perc int,
        tse_dt timestamp,
        f_lvl text,
        ign_f boolean,
        lk_loc text,
        lk_ts timestamp,
        mi_rem text,
        nr_fst text,
        perm_stat text,
        rec_crt_dt timestamp,
        sr_stat text,
        sor_query text,
        tp_dat text,
        tp_ts timestamp,
        tr_rem text,
        tr_type text,
        PRIMARY KEY (device_serial_no, device_type)
    ) WITH CLUSTERING ORDER BY (device_type ASC)

以下を使用して挿入されたデータ:

Insert into data2(all column names) values('64FCFCFC','HUM',4,'1970-01-02 05:30:00’ ,’NA’,true,'NA','1970-01-02 05:40:00',’NA’,'NA','NA','1970-02-01 05:30:00','NA','NA','NA','1970-02-03 05:30:00','NA','NA');

注意: この '1970-01-02 05:30:00'のように挿入しようとする4番目の列のタイムスタンプ、およびdtaframeにも正しくタイムスタンプが挿入されますが、データフレームからcassandraに挿入し、テーブルからselect *を使用すると、その1970-01-02 00:00:00.000000 + 0000のように挿入されます

同様に、すべてのタイムスタンプ列についても同様です。

pom.xml

<dependencies>
       <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector -->
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.3.1</version>
</dependency>


これらの値を読み取り、spark Scalaを使用して別のCassandraテーブルに書き込みます。以下のコードを参照してください:

val df2 = spark.read
                       .format("org.apache.spark.sql.cassandra")
                       .option("spark.cassandra.connection.host","hostname")
                       .option("spark.cassandra.connection.port","9042")
                       .option( "spark.cassandra.auth.username","usr")
                       .option("spark.cassandra.auth.password","pas")
                       .option("keyspace","hr")
                       .option("table","data2")
                       .load()
Val df3 =doing some processing on df2.
df3.write
         .format("org.apache.spark.sql.cassandra")
         .mode("append")
         .option("spark.cassandra.connection.host","hostname")
         .option("spark.cassandra.connection.port","9042")
         .option( "spark.cassandra.auth.username","usr")
         .option("spark.cassandra.auth.password","pas")
         .option("spark.cassandra.output.ignoreNulls","true")
         .option("confirm.truncate","true")
         .option("keyspace","hr")
         .option("table","data3")
         .save()

しかし、上記のコードを使用してデータを挿入しようとすると、以下のエラーが発生します、

java.lang.IllegalArgumentException: requirement failed: Invalid row size: 18 instead of 17.
    at scala.Predef$.require(Predef.scala:224)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

あなたの答え
  • 解決した方法 # 1

    これは既知の問題です(SPARKC-541)-DSE Searchが有効になっているテーブルのデータを、それなしのテーブルにコピーしています。変換の一部としてこの列をドロップするだけです。

    val df3 = df2.drop("solr_query").... // your transformations
    
    

    または、新しいドライバー(OSSドライバーを使用している場合は2.3.1)、またはこの修正を含む対応するDSEリリースを使用するだけです。

関連記事

  • 前へ java - JPAクエリ:サブクエリをグループ化条件に結合する
  • 次へ javascript - 都市名で現在時刻を取得する