• 根据timestamp进行scan

    val scan = new Scan()
    scan.setCaching(100)
    scan.setMaxResultSize(1000)
    scan.setTimeRange(1588608000000L, 1604246399000L)
    val filter = Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
    hbaseConf.set(TableInputFormat.SCAN, filter)
    spark.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
                                     classOf[ImmutableBytesWritable],
                                     classOf[Result]).map(x => {
    //...
    })
  • 根据字段条件scan

    val scan = new Scan()
    scan.setFilter(new SingleColumnValueFilter(
          Bytes.toBytes("INFO"),
          Bytes.toBytes("DT"),
          CompareOp.EQUAL,
          Bytes.toBytes("2020-11-01")))
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
    hbaseConf.set(TableInputFormat.SCAN, filter)
    spark.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
                                     classOf[ImmutableBytesWritable],
                                     classOf[Result]).map(x => {
    //...
    })
最后修改:2022 年 09 月 03 日
如果觉得我的文章对你有用,请随意赞赏