相关推荐recommended
Springboot整合HBase
作者:mmseoamin日期:2023-12-14

Springboot整合HBase数据库

1、添加依赖


    com.spring4all
    spring-boot-starter-hbase


    org.springframework.data
    spring-data-hadoop-hbase
    2.5.0.RELEASE


    org.springframework.data
    spring-data-hadoop
    2.5.0.RELEASE

2、添加配置
通过Yaml方式配置
spring:
  hbase:
     zookeeper:
      quorum: hbase1.xxx.org,hbase2.xxx.org,hbase3.xxx.org
      property:
         clientPort: 2181
  data:
    hbase:
      quorum: XXX
      rootDir: XXX
      nodeParent: XXX
zookeeper:
  znode:
    parent: /hbase
3、添加配置类
@Configuration
public class HBaseConfig {
    @Bean
    public HBaseService getHbaseService() {
        //设置临时的hadoop环境变量,之后程序会去这个目录下的\bin目录下找winutils.exe工具,windows连接hadoop时会用到
        //System.setProperty("hadoop.home.dir", "D:\\Program Files\\Hadoop");
        //执行此步时,会去resources目录下找相应的配置文件,例如hbase-site.xml
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        return new HBaseService(conf);
    }
}
4、工具类的方式实现HBASE操作
@Service
public class HBaseService {
    private Admin admin = null;
    private Connection connection = null;
    public HBaseService(Configuration conf) {
        connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();
    }
    //创建表 create , {NAME => , VERSIONS => }
    public boolean creatTable(String tableName, List columnFamily) {
        //列族column family
        List cfDesc = new ArrayList<>(columnFamily.size());
        columnFamily.forEach(cf -> {
            cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
                Bytes.toBytes(cf)).build());
        });
        //表 table
        TableDescriptor tableDesc = TableDescriptorBuilder
            .newBuilder(TableName.valueOf(tableName))
            .setColumnFamilies(cfDesc).build();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            log.debug("table Exists!");
        } else {
            admin.createTable(tableDesc);
            log.debug("create table Success!");
        }
        close(admin, null, null);
        return true;
    }
    public List getAllTableNames() {
        List result = new ArrayList<>();
        TableName[] tableNames = admin.listTableNames();
        for (TableName tableName : tableNames) {
            result.add(tableName.getNameAsString());
        }
        close(admin, null, null);
        return result;
    }
    public Map> getResultScanner(String tableName) {
        Scan scan = new Scan();
        return this.queryData(tableName, scan);
    }
    private Map> queryData(String tableName, Scan scan) {
        // 
        Map> result = new HashMap<>();
        ResultScanner rs = null;
        //获取表
        Table table = null;
        table = getTable(tableName);
        rs = table.getScanner(scan);
        for (Result r : rs) {
            // 每一行数据
            Map columnMap = new HashMap<>();
            String rowKey = null;
            // 行键,列族和列限定符一起确定一个单元(Cell)
            for (Cell cell : r.listCells()) {
                if (rowKey == null) {
                    rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                }
                columnMap.put(
                    //列限定符
                    Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                    //列族
                    Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
            }
            if (rowKey != null) {
                result.put(rowKey, columnMap);
            }
        }
        close(null, rs, table);
        return result;
    }
    public void putData(String tableName, String rowKey, String familyName, String[] columns, String[] values) {
        Table table = null;
        table = getTable(tableName);
        putData(table, rowKey, tableName, familyName, columns, values);
        close(null, null, table);
    }
    private void putData(Table table, String rowKey, String tableName, 
                         String familyName, String[] columns, String[] values) {
        //设置rowkey
        Put put = new Put(Bytes.toBytes(rowKey));
        if (columns != null && values != null && columns.length == values.length) {
            for (int i = 0; i < columns.length; i++) {
                if (columns[i] != null && values[i] != null) {
                    put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
                } else {
                    throw new NullPointerException(MessageFormat.format(
                        "列名和列数据都不能为空,column:{0},value:{1}", columns[i], values[i]));
                }
            }
        }
        table.put(put);
        log.debug("putData add or update data Success,rowKey:" + rowKey);
        table.close();
    }
    private Table getTable(String tableName) throws IOException {
        return connection.getTable(TableName.valueOf(tableName));
    }
    private void close(Admin admin, ResultScanner rs, Table table) {
        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                log.error("关闭Admin失败", e);
            }
            if (rs != null) {
                rs.close();
            }
            if (table != null) {
                rs.close();
            }
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    log.error("关闭Table失败", e);
                }
            }
        }
    }
}
 
测试类
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
class HBaseApplicationTests {
    @Resource
    private HBaseService hbaseService;
    //测试创建表
    @Test
    public void testCreateTable() {
        hbaseService.creatTable("test_base", Arrays.asList("a", "back"));
    }
    //测试加入数据
    @Test
    public void testPutData() {
        hbaseService.putData("test_base", "000001", "a", new String[]{
                "project_id", "varName", "coefs", "pvalues", "tvalues",
                "create_time"}, new String[]{"40866", "mob_3", "0.9416",
                "0.0000", "12.2293", "null"});
        hbaseService.putData("test_base", "000002", "a", new String[]{
                "project_id", "varName", "coefs", "pvalues", "tvalues",
                "create_time"}, new String[]{"40866", "idno_prov", "0.9317",
                "0.0000", "9.8679", "null"});
        hbaseService.putData("test_base", "000003", "a", new String[]{
                "project_id", "varName", "coefs", "pvalues", "tvalues",
                "create_time"}, new String[]{"40866", "education", "0.8984",
                "0.0000", "25.5649", "null"});
    }
    //测试遍历全表
    @Test
    public void testGetResultScanner() {
        Map> result2 = hbaseService.getResultScanner("test_base");
        System.out.println("-----遍历查询全表内容-----");
        result2.forEach((k, value) -> {
            System.out.println(k + "--->" + value);
        });
    }
}

三、使用spring-data-hadoop-hbase

3、配置类
@Configuration
public class HBaseConfiguration {
 
    @Value("${hbase.zookeeper.quorum}")
    private String zookeeperQuorum;
 
    @Value("${hbase.zookeeper.property.clientPort}")
    private String clientPort;
 
    @Value("${zookeeper.znode.parent}")
    private String znodeParent;
 
    @Bean
    public HbaseTemplate hbaseTemplate() {
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
        conf.set("hbase.zookeeper.property.clientPort", clientPort);
        conf.set("zookeeper.znode.parent", znodeParent);
        return new HbaseTemplate(conf);
    }
}
4、业务类中使用HbaseTemplate

这个是作为工具类

@Service
@Slf4j
public class HBaseService {
 
 
    @Autowired
    private HbaseTemplate hbaseTemplate;
 	
 	//查询列簇
    public List getRowKeyAndColumn(String tableName, String startRowkey, 
                                           String stopRowkey, String column, String qualifier) {
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        if (StringUtils.isNotBlank(column)) {
            log.debug("{}", column);
            filterList.addFilter(new FamilyFilter(CompareFilter.CompareOp.EQUAL,
                       new BinaryComparator(Bytes.toBytes(column))));
        }
        if (StringUtils.isNotBlank(qualifier)) {
            log.debug("{}", qualifier);
            filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, 
                       new BinaryComparator(Bytes.toBytes(qualifier))));
        }
        Scan scan = new Scan();
        if (filterList.getFilters().size() > 0) {
            scan.setFilter(filterList);
        }
        scan.setStartRow(Bytes.toBytes(startRowkey));
        scan.setStopRow(Bytes.toBytes(stopRowkey));
 
        return hbaseTemplate.find(tableName, scan, (rowMapper, rowNum) -> rowMapper);
    }
 
    public List getListRowkeyData(String tableName, List rowKeys, 
                                          String familyColumn, String column) {
        return rowKeys.stream().map(rk -> {
            if (StringUtils.isNotBlank(familyColumn)) {
                if (StringUtils.isNotBlank(column)) {
                    return hbaseTemplate.get(tableName, rk, familyColumn, 
                                column, (rowMapper, rowNum) -> rowMapper);
                } else {
                    return hbaseTemplate.get(tableName, rk, familyColumn,
                                (rowMapper, rowNum) -> rowMapper);
                }
            }
            return hbaseTemplate.get(tableName, rk, (rowMapper, rowNum) -> rowMapper);
        }).collect(Collectors.toList());
    }
}

四、使用spring-boot-starter-data-hbase

参考:https://blog.csdn.net/cpongo1/article/details/89550486

## 下载spring-boot-starter-hbase代码
git clone https://github.com/SpringForAll/spring-boot-starter-hbase.git
## 安装
cd spring-boot-starter-hbase
mvn clean install
2、添加配置项
  • spring.data.hbase.quorum 指定 HBase 的 zk 地址
  • spring.data.hbase.rootDir 指定 HBase 在 HDFS 上存储的路径
  • spring.data.hbase.nodeParent 指定 ZK 中 HBase 的根 ZNode
    3、定义好DTO
    @Data
    public class City {
        private Long id;
        private Integer age;
        private String cityName;  
    }
    
    4、创建对应rowMapper
    public class CityRowMapper implements RowMapper {
     
        private static byte[] COLUMN_FAMILY = "f".getBytes();
        private static byte[] NAME = "name".getBytes();
        private static byte[] AGE = "age".getBytes();
     
        @Override
        public City mapRow(Result result, int rowNum) throws Exception {
            String name = Bytes.toString(result.getValue(COLUMN_FAMILY, NAME));
            int age = Bytes.toInt(result.getValue(COLUMN_FAMILY, AGE));
     
            City dto = new City();
            dto.setCityName(name);
            dto.setAge(age);
            return dto;
        }
    }
    
    5、操作实现增改查
    • HbaseTemplate.find 返回 HBase 映射的 City 列表
    • HbaseTemplate.get 返回 row 对应的 City 信息
    • HbaseTemplate.saveOrUpdates 保存或者更新

      如果 HbaseTemplate 操作不满足需求,完全可以使用 hbaseTemplate 的getConnection() 方法,获取连接。进而类似 HbaseTemplate 实现的逻辑,实现更复杂的需求查询等功能

      @Service
      public class CityServiceImpl implements CityService {
       
          @Autowired private HbaseTemplate hbaseTemplate;
       	//查询
          public List query(String startRow, String stopRow) {
              Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow));
              scan.setCaching(5000);
              List dtos = this.hbaseTemplate.find("people_table", scan, new CityRowMapper());
              return dtos;
          }
       	//查询
          public City query(String row) {
              City dto = this.hbaseTemplate.get("people_table", row, new CityRowMapper());
              return dto;
          }
       	//新增或者更新
          public void saveOrUpdate() {
              List saveOrUpdates = new ArrayList();
              Put            put           = new Put(Bytes.toBytes("135xxxxxx"));
              put.addColumn(Bytes.toBytes("people"), Bytes.toBytes("name"), Bytes.toBytes("test"));
              saveOrUpdates.add(put);
              this.hbaseTemplate.saveOrUpdates("people_table", saveOrUpdates);
          }
      }
      

      Springboot整合Influxdb

      中文文档:https://jasper-zhang1.gitbooks.io/influxdb/content/Introduction/installation.html

      注意,项目建立在spring-boot-web基础上

      1、添加依赖
      
          org.influxdb
          influxdb-java
          2.15
      
      
      2、添加配置
      spring:
        influx:
          database: my_sensor1
          password: admin
          url: http://127.0.0.1:6086
          user: admin
      
      3、编写配置类
      @Configuration
      public class InfluxdbConfig {
              
          @Value("${spring.influx.url}")
          private String influxDBUrl; 
          @Value("${spring.influx.user}")
          private String userName;    
          @Value("${spring.influx.password}")
          private String password;    
          @Value("${spring.influx.database}")
          private String database;    
          @Bean("influxDB")
          public InfluxDB influxdb(){     
              InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password);
              try {
                  
                  /** 
                   * 异步插入:
                   * enableBatch这里第一个是point的个数,第二个是时间,单位毫秒    
                   * point的个数和时间是联合使用的,如果满100条或者60 * 1000毫秒   
                   * 满足任何一个条件就会发送一次写的请求。
                   */
                  influxDB.setDatabase(database).enableBatch(100,1000 * 60, TimeUnit.MILLISECONDS);
                  
              } catch (Exception e) { 
                  e.printStackTrace();
              } finally { 
                  //设置默认策略
                  influxDB.setRetentionPolicy("sensor_retention");    
              }
              //设置日志输出级别
              influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);  
              return influxDB;
          }
      }
      
      4、InfluxDB原生API实现
      @SpringBootTest(classes = {MainApplication.class})
      @RunWith(SpringJUnit4ClassRunner.class)
      public class InfluxdbDBTest {
          @Autowired
          private InfluxDB influxDB;
          
          //measurement
          private final String measurement = "sensor";
          
          @Value("${spring.influx.database}")
          private String database;
          
          /**
           * 批量插入第一种方式
           */
          @Test
          public void insert(){
              List lines = new ArrayList();       
              Point point = null;     
              for(int i=0;i<50;i++){          
                  point = Point.measurement(measurement)
                  .tag("deviceId", "sensor" + i)
                  .addField("temp", 3)
                  .addField("voltage", 145+i)
                  .addField("A1", "4i")
                  .addField("A2", "4i").build();
                  lines.add(point.lineProtocol());
              }
              //写入
              influxDB.write(lines);
          }
          
          /**
           * 批量插入第二种方式
           */
          @Test
          public void batchInsert(){
              BatchPoints batchPoints = BatchPoints
                      .database(database)
                      .consistency(InfluxDB.ConsistencyLevel.ALL)
                      .build();
            //遍历sqlserver获取数据
            for(int i=0;i<50;i++){
              //创建单条数据对象——表名
              Point point = Point.measurement(measurement)
                //tag属性——只能存储String类型
                      .tag("deviceId", "sensor" + i)
                      .addField("temp", 3)
                      .addField("voltage", 145+i)
                      .addField("A1", "4i")
                      .addField("A2", "4i").build();
              //将单条数据存储到集合中
              batchPoints.point(point);
            }
            //批量插入
            influxDB.write(batchPoints); 
          }
          
          /**
           * 获取数据
           */
          @Test
          public void datas(@RequestParam Integer page){
              int pageSize = 10;
              // InfluxDB支持分页查询,因此可以设置分页查询条件
              String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
              
              String queryCondition = "";  //查询条件暂且为空
              // 此处查询所有内容,如果
              String queryCmd = "SELECT * FROM "
                  // 查询指定设备下的日志信息
                  // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
                  // + 策略name + "." + measurement
                  + measurement
                  // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
                  + queryCondition
                  // 查询结果需要按照时间排序
                  + " ORDER BY time DESC"
                  // 添加分页查询条件
                  + pageQuery;
              
              QueryResult queryResult = influxDB.query(new Query(queryCmd, database));
              System.out.println("query result => "+queryResult);
          }
      }
      
      5、采用封装工具类
      1、创建实体类
      @Data
      @Measurement(name = "sensor")
      public class Sensor {
          @Column(name="deviceId",tag=true)
          private String deviceId;
          
          @Column(name="temp")
          private float temp;
          
          @Column(name="voltage")
          private float voltage;
          
          @Column(name="A1")
          private float A1;
          
          @Column(name="A2")
          private float A2;
          
          @Column(name="time")
          private String time;    
          
      }
      
      2、创建工具类
      @Component
      public class InfluxdbUtils {
          @Autowired
          private InfluxDB influxDB;
          
          @Value("${spring.influx.database}")
          private String database;    
          
          /**
           * 新增单条记录,利用java的反射机制进行新增操作
           */
          @SneakyThrows
          public void insertOne(Object obj){
              //获取度量
              Class clasz = obj.getClass();
              Measurement measurement = clasz.getAnnotation(Measurement.class);
              //构建
              Point.Builder builder = Point.measurement(measurement.name());
              // 获取对象属性
              Field[] fieldArray = clasz.getDeclaredFields();
              Column column = null;
              for(Field field : fieldArray){
                      column = field.getAnnotation(Column.class);
                      //设置属性可操作
                      field.setAccessible(true); 
                      if(column.tag()){
                          //tag属性只能存储String类型
                          builder.tag(column.name(), field.get(obj).toString());
                      }else{
                          //设置field
                          if(field.get(obj) != null){
                              builder.addField(column.name(), field.get(obj).toString());
                          }
                      }
              }
              influxDB.write(builder.build());
          }
          
          /**
           * 批量新增,方法一
           */
          @SneakyThrows
          public void insertBatchByRecords(List records){
              List lines = new ArrayList();   
              records.forEach(record->{
                  Class clasz = record.getClass();
                  //获取度量
                  Measurement measurement = clasz.getAnnotation(Measurement.class);
                  //构建
                  Point.Builder builder = Point.measurement(measurement.name());
                  Field[] fieldArray = clasz.getDeclaredFields();
                  Column column = null;
                  for(Field field : fieldArray){
                          column = field.getAnnotation(Column.class);
                          //设置属性可操作
                          field.setAccessible(true); 
                          if(column.tag()){
                              //tag属性只能存储String类型
                              builder.tag(column.name(), field.get(record).toString());
                          }else{
                              //设置field
                              if(field.get(record) != null){
                                  builder.addField(column.name(), field.get(record).toString());
                              }
                          }
                  }
                  lines.add(builder.build().lineProtocol());
              });
              influxDB.write(lines);
          }
          
          /**
           * 批量新增,方法二
           */
          @SneakyThrows
          public void insertBatchByPoints(List records){
              BatchPoints batchPoints = BatchPoints.database(database)
                      .consistency(InfluxDB.ConsistencyLevel.ALL)
                      .build();
              records.forEach(record->{
                  Class clasz = record.getClass();
                  //获取度量
                  Measurement measurement = clasz.getAnnotation(Measurement.class);
                  //构建
                  Point.Builder builder = Point.measurement(measurement.name());
                  Field[] fieldArray = clasz.getDeclaredFields();
                  Column column = null;
                  for(Field field : fieldArray){
                          column = field.getAnnotation(Column.class);
                          //设置属性可操作
                          field.setAccessible(true); 
                          if(column.tag()){
                              //tag属性只能存储String类型
                              builder.tag(column.name(), field.get(record).toString());
                          }else{
                              //设置field
                              if(field.get(record) != null){
                                  builder.addField(column.name(), field.get(record).toString());
                              }
                          }
                  }
                  batchPoints.point(builder.build());
              });
              influxDB.write(batchPoints);
          }
          
          /**
           * 查询,返回Map集合
           * @param query 完整的查询语句
           */
          public List fetchRecords(String query){
              List results = new ArrayList();
              QueryResult queryResult = influxDB.query(new Query(query, database));
              queryResult.getResults().forEach(result->{
                  result.getSeries().forEach(serial->{
                      List columns = serial.getColumns();
                      int fieldSize = columns.size();
                      serial.getValues().forEach(value->{     
                          Map obj = new HashMap();
                          for(int i=0;i   
                              obj.put(columns.get(i), value.get(i));
                          }
                          results.add(obj);
                      });
                  });
              });
              return results;
          }
          
          /**
           * 查询,返回map集合
           * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
           * @param measurement 度量,不可为空;
           */
          public List fetchRecords(String fieldKeys, String measurement){
              StringBuilder query = new StringBuilder();
              query.append("select ").append(fieldKeys).append(" from ").append(measurement);     
              return this.fetchRecords(query.toString());
          }
          
          /**
           * 查询,返回map集合
           * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
           * @param measurement 度量,不可为空;
           */
          public List fetchRecords(String fieldKeys, String measurement, String order){
              StringBuilder query = new StringBuilder();
              query.append("select ").append(fieldKeys).append(" from ").append(measurement);
              query.append(" order by ").append(order);       
              return this.fetchRecords(query.toString());
          }
          
          /**
           * 查询,返回map集合
           * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
           * @param measurement 度量,不可为空;
           */
          public List fetchRecords(String fieldKeys, String measurement, String order, String limit){
              StringBuilder query = new StringBuilder();
              query.append("select ").append(fieldKeys).append(" from ").append(measurement);
              query.append(" order by ").append(order);
              query.append(limit);
              return this.fetchRecords(query.toString());
          }
          
          /**
           * 查询,返回对象的list集合
           */
          @SneakyThrows
          public  List fetchResults(String query, Class clasz){
              List results = new ArrayList<>();
              QueryResult queryResult = influxDB.query(new Query(query, database));
              queryResult.getResults().forEach(result->{
                  result.getSeries().forEach(serial->{
                      List columns = serial.getColumns();
                      int fieldSize = columns.size();     
                      serial.getValues().forEach(value->{ 
                          Object obj = null;
                              obj = clasz.newInstance();
                              for(int i=0;i   
                                  String fieldName = columns.get(i);
                                  Field field = clasz.getDeclaredField(fieldName);
                                  field.setAccessible(true);
                                  Class type = field.getType();
                                  if(type == float.class){
                                      field.set(obj, Float.valueOf(value.get(i).toString()));
                                  }else{
                                      field.set(obj, value.get(i));
                                  }                           
                              }
                          results.add(obj);
                      });
                  });
              });
              return results;
          }
          
          /**
           * 查询,返回对象的list集合
           */
          public  List fetchResults(String fieldKeys, String measurement, Class clasz){
              StringBuilder query = new StringBuilder();
              query.append("select ").append(fieldKeys).append(" from ").append(measurement);     
              return this.fetchResults(query.toString(), clasz);
          }
          
          /**
           * 查询,返回对象的list集合
           */
          public  List fetchResults(String fieldKeys, String measurement, String order, Class clasz){
              StringBuilder query = new StringBuilder();
              query.append("select ").append(fieldKeys).append(" from ").append(measurement);
              query.append(" order by ").append(order);
              return this.fetchResults(query.toString(), clasz);
          }
          
          /**
           * 查询,返回对象的list集合
           */
          public  List fetchResults(String fieldKeys, String measurement, String order, String limit, Class clasz){
              StringBuilder query = new StringBuilder();
              query.append("select ").append(fieldKeys).append(" from ").append(measurement);
              query.append(" order by ").append(order);
              query.append(limit);        
              return this.fetchResults(query.toString(), clasz);
          }
      }
       
      
      3、使用工具类的测试代码
      @SpringBootTest(classes = {MainApplication.class})
      @RunWith(SpringJUnit4ClassRunner.class)
      public class InfluxdbUtilTest {
          @Autowired
          private InfluxdbUtils influxdbUtils;
          
          /**
           * 插入单条记录
           */
          @Test
          public void insert(){
            Sensor sensor = new Sensor();
            sensor.setA1(10);
            sensor.setA2(10);
            sensor.setDeviceId("0002");
            sensor.setTemp(10L);
            sensor.setTime("2021-01-19");
            sensor.setVoltage(10);
            influxdbUtils.insertOne(sensor);
          }
          
          /**
           * 批量插入第一种方式
           */
          @GetMapping("/index22")
          public void batchInsert(){  
              List sensorList = new ArrayList();
              for(int i=0; i<50; i++){
                  Sensor sensor = new Sensor();
                  sensor.setA1(2);
                  sensor.setA2(12);
                  sensor.setTemp(9);
                  sensor.setVoltage(12);
                  sensor.setDeviceId("sensor4545-"+i);
                  sensorList.add(sensor);
              }
              influxdbUtils.insertBatchByRecords(sensorList);
          }
          
          /**
           * 批量插入第二种方式
           */
          @GetMapping("/index23")
          public void batchInsert1(){ 
              List sensorList = new ArrayList();
              Sensor sensor = null;
              for(int i=0; i<50; i++){
                  sensor = new Sensor();
                  sensor.setA1(2);
                  sensor.setA2(12);
                  sensor.setTemp(9);
                  sensor.setVoltage(12);
                  sensor.setDeviceId("sensor4545-"+i);
                  sensorList.add(sensor);
              }
              influxdbUtils.insertBatchByPoints(sensorList);
          }
              
          /**
           * 查询数据
           */
          @GetMapping("/datas2")
          public void datas(@RequestParam Integer page){
              int pageSize = 10;
              // InfluxDB支持分页查询,因此可以设置分页查询条件
              String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
              
              String queryCondition = "";  //查询条件暂且为空
              // 此处查询所有内容,如果
              String queryCmd = "SELECT * FROM sensor"
                  // 查询指定设备下的日志信息
                  // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
                  // + 策略name + "." + measurement
                  // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
                  + queryCondition
                  // 查询结果需要按照时间排序
                  + " ORDER BY time DESC"
                  // 添加分页查询条件
                  + pageQuery;
              
              List sensorList = influxdbUtils.fetchRecords(queryCmd);
              System.out.println("query result => {}"+sensorList );
          }
          
          /**
           * 获取数据
           */
          @GetMapping("/datas21")
          public void datas1(@RequestParam Integer page){
              int pageSize = 10;
              // InfluxDB支持分页查询,因此可以设置分页查询条件
              String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
              
              String queryCondition = "";  //查询条件暂且为空
              // 此处查询所有内容,如果
              String queryCmd = "SELECT * FROM sensor"
                  // 查询指定设备下的日志信息
                  // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
                  // + 策略name + "." + measurement
                  // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
                  + queryCondition
                  // 查询结果需要按照时间排序
                  + " ORDER BY time DESC"
                  // 添加分页查询条件
                  + pageQuery;
              List sensorList = influxdbUtils.fetchResults(queryCmd, Sensor.class);
              //List sensorList = influxdbUtils.fetchResults("*", "sensor", Sensor.class);
              sensorList.forEach(sensor->{
                  System.out.println("query result => {}"+sensorList );
              });     
          }
      }
       
      
      6、采用封装数据模型的方式
      1、在Influxdb库中创建存储策略
      CREATE RETENTION POLICY "rp_order_payment" ON "db_order" DURATION 30d REPLICATION 1 DEFAULT
      
      2、创建数据模型
      @Data
      @Measurement(name = "m_order_payment",
      		database = "db_order", 
      		retentionPolicy = "rp_order_payment")
      public class OrderPayment implements Serializable  {
          // 统计批次
          @Column(name = "batch_id", tag = true)
          private String batchId;
          // 哪个BU
          @Column(name = "bu_id", tag = true)
          private String buId;
          // BU 名称
          @Column(name = "bu_name")
          private String buName;
          // 总数
          @Column(name = "total_count", tag = true)
          private String totalCount;
          // 支付量
          @Column(name = "pay_count", tag = true)
          private String payCount;
          // 金额
          @Column(name = "total_money", tag = true)
          private String totalMoney;
      }
      
      3、创建Mapper
      public class InfluxMapper extends InfluxDBMapper {
          public InfluxMapper(InfluxDB influxDB) {
              super(influxDB);
          }
      }
      
      4、配置Mapper
      @Log4j2
      @Configuration
      public class InfluxAutoConfiguration {
          @Bean
          public InfluxMapper influxMapper(InfluxDB influxDB) {
              InfluxMapper influxMapper = new InfluxMapper(influxDB);
              return influxMapper;
          }
      }
      
      5、测试CRUD
      @SpringBootTest(classes = {MainApplication.class})
      @RunWith(SpringJUnit4ClassRunner.class)
      public class InfluxdbMapperTest {
          @Autowired
          private InfluxMapper influxMapper;
          @Test
          public void save(OrderPayment product) {
              influxMapper.save(product);
          }
          @Test
          public void queryAll() {
              List products = influxMapper.query(OrderPayment.class);
              System.out.println(products);
          }
          @Test
          public void queryByBu(String bu) {
              String sql = String.format("%s'%s'", "select * from m_order_payment where bu_id = ", bu);
              Query query = new Query(sql, "db_order");
              List products = influxMapper.query(query, OrderPayment.class);
              System.out.println(products);
          }
      }
      

      参考:https://blog.csdn.net/cpongo1/article/details/89550486

      https://github.com/SpringForAll/spring-boot-starter-hbase

      https://github.com/JeffLi1993/springboot-learning-example