第一种:
import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.spark.rdd.RDD import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.filter._ import org.apache.hadoop.hbase.util.Bytes val sparkConf = new SparkConf().setAppName("HbaseTest").setMaster("local[1]") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum",Spark_HbaseUtil.getProperties("bootstrap.servers") ) val tableName = "sinldo:hos_index" conf.set(TableInputFormat.INPUT_TABLE, tableName) //开始rowKey和结束rowKey一样代表精确查询的某条数据 val startRowkey = lastRowKey // 组装scan语句 startRowkey stopRowkey可以写成参数 val scan = new Scan(Bytes.toBytes(startRowkey)) //true代表不查询全表 scan.setCacheBlocks(true) scan.setCaching(9) val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); val filter = new PageFilter(10);
val proto = ProtobufUtil.toScan(scan) val scanToString = Base64.encodeBytes(proto.toByteArray) conf.set(TableInputFormat.SCAN, scanToString) val hBaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) //获取数量 val count = hBaseRDD.count() println(count)
第二种:将第一种的设置开始RoeKey的地方换成
import org.apache.hadoop.hbase.filter.RowFilter import org.apache.hadoop.hbase.filter.BinaryComparator;
val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); val filter = new PageFilter(10)
//002e代表开始的rowKey val rowFilter2: Filter = new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes("022e"))); filterList.addFilter(filter) filterList.addFilter(rowFilter2) scan.setFilter(filterList)