欢迎投稿

今日深度:

influxdb数据库常用命令及SpringBoot整合,

influxdb数据库常用命令及SpringBoot整合,


目录
  • 一、influxdb一些概念
    • 1、时序数据库
    • 2、influxdb和mysql类比
    • 3、points(类似表的一行数据)数据结构
    • 4、measurement特性
    • 5、Tag和Field
    • 6、Series
  • 二、数据库、表操作命令
    • 1、数据库操作
    • 2、表(插入)操作
  • 三、influxdb的sql操作(查询)
    • 1、select
    • 2、where语句
    • 3、group by语句
    • 4、into语句
    • 5、子查询
    • 6、INTO导入到新表,导致数据丢失问题
  • 四、influxdb保留策略(RP)
    • 五、influxdb连续查询(CQ)
      • 六、SpringBoot整合influxdb
        • 七、influxdb图形化界面客户端

          一、influxdb一些概念

          Influxdb是一个开源的分布式时序、时间和指标数据库,使用go语言编写,无需外部依赖。

          注意: influxdb只针对单机版开源,集群版还是要收费的。

          1、时序数据库

          时间序列数据库是按照时间顺序来记录数据,比如我们想存储服务器CPU和内存分钟级的使用率,即一分钟产生一条数据,时序数据库就是最佳选择。

          注意:时序数据库是不支持删除和更新的

          2、influxdb和mysql类比

          在这里插入图片描述

          3、points(类似表的一行数据)数据结构

          在这里插入图片描述

          4、measurement特性

          • measurement类似mysql中的表,无需单独创建,在第一次插入数据时自动创建,measurement中无数据,表也就不存在了。
          • measurement没有update语句,无法修改measurement以及tags-key和fields-key名称,实际中如有需要只能删除整个measurement重新插入数据

          5、Tag和Field

          Tag: 相当于mysql的索引,Group By查询时只能用tag和time字段,数据类型只能是String

          Field: 在Group By查询中,函数操作只能用Field字段,例如sum();

          Field支持数据类型有int,float,boolean类型

          讲白了,当我们插入机器的cpu使用率时:主机的IP,Name等信息用Tag存,而使用率大小则用Field存。

          6、Series

          InfluxDB中的series是一种集合的概念,在同一个database中,相同retention policy、相同measurement、相同tag的数据属于一个series集合,同一个series的数据在物理上按照时间顺序排列在一起。

          series就是不同tag列,进行排列组合。比如一个表中有两个tag字段A和B。 A字段有有3种值,B字段有4种值,则series个数 = 3*4=12个。

          二、数据库、表操作命令

          1、数据库操作

          //通过cli操作influxdb,(登录influxdb)
          influx -username root -password root
          //查看所有数据库
          show databases
          //进入到某个数据库
          use influxdb
          //查看用户,两列数据,一列是用户名称,一列是是否为管理员用户
          show users
          //创建普通户
          create user "influx" with password '123456'
          //创建管理员用户
          create user "root" with password '123456' with all privileges
          //修改用户密码
          set password for root= 'root'
          /**
          * 数据库设置admin权限的两种方式(建议使用第二种)
          */
          GRANT ALL PRIVILEGES ON 数据库名称  TO 用户名称
          GRANT ALL PRIVILEGES TO 用户名称
          /**
          * 数据库撤销admin权限的两种方式(建议使用第二种)
          */
          Revoke ALL PRIVILEGES ON 数据库名称 FROM 用户名称
          Revoke ALL PRIVILEGES FROM 用户名称
          //查看所有表
          show measurements
          //查看表的tag
          show tag keys from 表名称
          //查看表的field
          show field keys from 表名称
          //查看表的series
          show series from "表名称"
          //查看当前库的series
          show series

          2、表(插入)操作

          一般情况下,经常作为查询条件的列,在初始时设置为tag,即索引,fields不常作为查询条件,更甚者,在复杂查询语法中,聚合语句group by 后面只能是time和tags。 一个measurement 是有多个tag和field的。

          /**
          * InfluxDB中没有显式的新建表的语句,只能通过insert数据的方式来建立新表。
          * 其中 cpu_usage 就是表名,resKey索引(tag),value=xx是记录值(field),索引和记录值可以有多个,后面的时间戳是我们指定的,如果不指定系统自动加当前时间时间戳。
          */
          insert cpu_usage resKey=470b14f0-e869-43ed-a8e6-fd634258271f value=0.96516
          #指定时间戳插入
          insert cpu_usage resKey=470b14f0-e869-43ed-a8e6-fd634258271f value=0.96516 1688794582731
          //删除表
          drop measurement cpu_usage
          //删除数据
          //influxDB是没有提供直接删除数据记录的方法,但是提供数据保存策略,主要用于指定数据保留时间,超过指定时间,就删除这部分数据。(数据库过期策略至少一个小时)

          三、influxdb的sql操作(查询)

          //查询基本构成,中括号[]中间的,都是可选的部分
          SELECT * [INTO_clause] FROM "表名" [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT N]

          1、select

          查询注意:

          • 在SELECT 子句中,必须要有至少一个field key!如果在SELECT子句中只有一个或多个tag key,那么该查询会返回空。这是由InfluxDB底层存储数据的方式所导致的结果。
          • 在查询中,发现插入的时间会比实际时间少8小时。那是因为InfluxDB默认以UTC时间存储并返回时间戳,当接收到一个时序数据记录时,InfluxDB将时间戳从本地时区时间转换为UTC时间并存储,查询时,InfluxDB返回的时间戳对应的是UTC时间 。 在sql后面加上tz(‘Asia/Shanghai’),可以将UTC时间转换为中国本地时间, 如果加上tz(‘Asia/Shanghai’)后报错,安装下go语言的安装包即可解决。
          • 如果查询时,发现表里一查不出来数据,可能是表数据太大导致,在后面加个limit 10,也可能是表明没加上保存策略导致的,保存策略下面有讲。
          SELECT * FROM "cpu_usage" limit 10 tz('Asia/Shanghai')

          4、在field上做一些基本计算

          SELECT (value+400)*10 FROM "cpu_usage" limit 10 tz('Asia/Shanghai')

          2、where语句

          注意事项:

          • 在InfluxDB的WHERE子句中,不支持使用 OR 来指定不同的time区间,否则将为空。比如:time=1688552647654 or time=1688552647666
          • 支持对string, boolean, float 和 integer类型的field values进行比较,如果是string类型的field value,一定要用单引号括起来。如果不适用引号括起来,或者使用的是双引号,将不会返回任何数据,有时甚至都不报错。
          支持的操作符:
          = 等于
          <> 不等于
          != 不等于
          > 大于
          >= 大于等于
          < 小于
          <= 小于等于
          // 查询有特定field的key value为字符串的数据
          SELECT * FROM "cpu_usage" WHERE "resKey" = 'qqqqqq'  tz('Asia/Shanghai')
          // 查询有特定field的key value并且带计算的数据
          SELECT (value+400)*10 FROM "cpu_usage" limit 10 tz('Asia/Shanghai')
          • 在WHERE子句中的tag values,也要用单引号括起来。如果不用引号括起来,或者使用双引号,则查询不会返回任务数据。甚至不会报错。
          • 根据时间戳来过滤数据。
          //查询10天前的数据
          SELECT * FROM "cpu_usage" WHERE time > now() - 10d
          //根据指定时间过滤
          SELECT * FROM "cpu_usage" WHERE time >= '2023-06-18T00:00:00Z' AND time <= '2024-08-18T12:30:00Z'

          3、group by语句

          注意事项:

          • 有 GROUP BY的查询,需要查询的字段一定要经过函数。
          • 对多个 tag 和单个tag做 GROUP BY, 求字段value的平均值
          SELECT MEAN("value") FROM "temperature" GROUP BY "field1", "field2"
          • 根据时间戳进行 GROUP BY time(time_interval) ,查询指定时间区间,以30分钟间隔分组
          SELECT MEAN(value) FROM "cpu_usage" WHERE time >= '2023-07-01T10:00:00Z' AND time < '2024-08-02T00:00:00Z' GROUP BY time(30m) tz('Asia/Shanghai')
          • 根据时间戳进行GROUP BY time(time_interval, offset_interval),查询指定时间区间,以30分钟间隔分组,offset_interval 是一个持续时间。它向前或向后移动 InfluxDB 的预设时间界限,可以为正或负数。
          //上面sql第一条数据是从2023-07-01 18:00:00开始的,如果加上offset_interval值为15m, 则查询第一条数据变成了2023-07-01 17:45:00   
          SELECT MEAN(value) FROM "cpu_usage" WHERE time >= '2023-07-01T10:00:00Z' AND time < '2024-08-02T00:00:00Z' GROUP BY time(30m,15m) tz('Asia/Shanghai')
          • fill()函数用法,它会更改不含数据的时间间隔的返回值。
          //上面sql中,如果需要聚合的时间段每数据,则用fill()中指定的值填充
          SELECT MEAN(value) FROM "cpu_usage" WHERE time >= '2023-07-01T10:00:00Z' AND time < '2023-08-02T00:00:00Z' GROUP BY time(30m) fill(10) tz('Asia/Shanghai')
          • Influxdb中Group By查询中如果没聚合函数,查询会返回该分组内的所有数据。而Mysql中Group By查询中如果没聚合函数,查询只会返回一条数据。 如果查询中有聚合函数,Mysql和Influxdb一样,返回该组内的聚合数,也就只有一条。

          4、into语句

          INTO主要作用是把一个表的数据导入到另外一个表中。插入到新表的字段是group by后的字段和查询的字段取并集,如果没Group By,就只看查询出来的字段。

          #把表cpu_usage的数据导入到备份表cpu_usage_bak中,新表和旧表的tag和Field不变
          SELECT * into "cpu_usage_bak" FROM "cpu_usage" 
          #该sql只会把value字段中插入到备份表cpu_usage_bak中,需要插入的字段在原变和旧表中tag和Field不变
          SELECT value into "cpu_usage_bak" FROM "cpu_usage" 
          #该sql只会把value和resKey字段中插入到备份表cpu_usage_bak中,需要插入的字段在原变和旧表中tag和Field不变
          SELECT value into "cpu_usage_bak" FROM "cpu_usage"  group by resKey
          #和上面sql一样,只不过多个聚合函数
          SELECT max(value) into "cpu_usage_bak" FROM "cpu_usage"  group by resKey
          #该sql插入新表后,字段都会变成field,如果不想变成Filed,查询时加Group By即可
          SELECT value,resKey into "cpu_usage_bak" FROM "cpu_usage" 
          #该sql中存在子查询,从子查询查数来的数据插入到新表,字段都变成了Field
          SELECT value,resKey into  "default"."cpu_usage_bak"   FROM  (select * from "default"."cpu_usage" group by resKey)

          5、子查询

          子查询是嵌套在另一个查询的 FROM 子句中的查询。使用子查询将查询作为条件应用于其他查询。子查询提供与嵌套函数和 SQLHAVING 子句类似的功能。

          #求组内最大值的平均值
          SELECT MEAN("value") FROM (SELECT MAX("value") AS value FROM "cpu_usage" GROUP BY "resKey")
          #求两个字段差的平均值
          SELECT MEAN("difference") FROM (SELECT "字段1" - "字段2" AS "difference" FROM "cpu_usage")

          6、INTO导入到新表,导致数据丢失问题

          Influxdb的内部机制是基于时间的时序性数据库,每一条记录都会有一个时间标识,如果客户端记录没给这个时间,influxdb会提供。主键是由time + tag组成,是不可以重复的,如果重复,后面的将覆盖前面的。 相同的 measurement,tagset 和 timestamp 的数据会覆盖。 解决方法就是提高时间精度到纳秒。或者增加tag来标识不同的点。

          四、influxdb保留策略(RP)

          什么是保留策略?

          用来定义数据在InfluxDB中存放的时间,就是保留策略,过了这个时间,数据过期会自动删除。 一个数据库可以有多个保留策略,但每个策略必须是独一无二的。 保留策略实际就类似于Mysql中的定时任务

          //查看数据保存策略(建议使用第二种)
          show retention policies on "数据库名称"
          show retention policies
          /**
          * 创建数据保留策略
          * h(小时),d(天),w(星期)
          */
          //CREATE RETENTION POLICY "保留策略名称" ON "数据库名称" DURATION "该保留策略对应的数据过期时间" REPLICATION "复制因子,开源的InfluxDB单机环境永远为1" SHARD DURATION "分片组的默认时长" DEFAULT
          //DEFAULT是默认保存策略,一个库中只有一个默认保存策略。
          create retention policy "testPolice" on myInfluxdb duration 72h    replication 1 SHARD DURATION 1h default
          //修改保留策略
          alter retention policy "保留策略名称" on "数据库名称" duration 1d
          //删除保留策略
          drop retention policy "保留策略名称" on "数据库名称"

          备注:如果查询influxdb没数据,一定要在表名加上保存策略。

          五、influxdb连续查询(CQ)

          什么是连续查询? 它会按照用户指定的查询规则,自动地、周期地查询实时数据并执行指定运算,然后将查询结果保存在一张指定的表中。

          举个例子来理解连续查询:现有CPU使用率数据分钟级别,需要把分钟级别的数据汇聚成小时级别的(生成的小时级别数据插入小时表中)。 我们只需要创建个连续查询,influxdb会自动实时的把数据插入到小时表中。

          注意:保存策略和连续查询往往配合使用的。

          用一个需求更加理解保存策略和连续查询的用法:

          1、把分钟级别数据保存两个小时。

          2、把小时级别数据保存10周

          实现方式:

          //创建分钟级别的保存策略:two_hours是策略名称,rightcloud是库名,
          CREATE RETENTION POLICY "two_hours" ON "rightcloud" DURATION 2h REPLICATION 1
          //创建小时级别的保存策略:a_year是策略名称,rightcloud是库名
          CREATE RETENTION POLICY "a_year" ON "rightcloud" DURATION 10w REPLICATION 1
          //创建连续查询:con_query连续查询名称
          CREATE CONTINUOUS QUERY "con_query" ON "rightcloud" BEGIN
            SELECT mean("value") AS "value"
            INTO "a_year"."cpu_usage"
            FROM "autogen"."cpu_usage"
            GROUP BY time(10s)
          END
          //注意:分钟级别数据和小时级别数据都是保存在cpu_usage表中,是通过保存策略来区分开的,在查询表数据时,一定要加上策略名称
          select * from "a_year"."cpu_usage";
          select * from "autogen"."cpu_usage";

          六、SpringBoot整合influxdb

          1、引入maven

          server:
            port: 8080
          spring:
            influx:
              url: http://127.0.0.1:8086 #influxdb服务器的地址
              username:  #用户名
              password:  #密码
              database: rightcloud #指定的数据库
              retention_policy: autogen

          2、application.yml

          server:  port: 8080spring:  influx:    url: //127.0.0.1:8086 #influxdb服务器的地址    username:  #用户名    password:  #密码    database: rightcloud #指定的数据库    retention_policy: autogen

          3、InfluxdbConfig配置类

          import org.influxdb.InfluxDB;
          import org.influxdb.InfluxDBFactory;
          import org.springframework.beans.factory.annotation.Value;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.util.StringUtils;
          /**
           * influxdb配置文件
           */
          @Configuration
          public class InfluxdbConfig {
              @Value("${spring.influx.url}")
              private String influxDBUrl;
              @Value("${spring.influx.username}")
              private String userName;
              @Value("${spring.influx.password}")
              private String password;
              @Value("${spring.influx.retention_policy}")
              private String retention_policy;
              @Value("${spring.influx.database}")
              private String database;
              @Bean
              public InfluxDB influxdb() {
                  InfluxDB influxDB = null;
                  if (StringUtils.isEmpty(userName)) {
                      influxDB = InfluxDBFactory.connect(influxDBUrl);
                  } else {
                      influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password);
                  }
                  try {
                      /**
                       * 异步插入:
                       * enableBatch这里第一个是point的个数,第二个是时间,单位毫秒
                       * point的个数和时间是联合使用的,如果满100条或者60 * 1000毫秒
                       * 满足任何一个条件就会发送一次写的请求。
                       */
          //            influxDB.setDatabase(database).enableBatch(100, 1000 * 60, TimeUnit.MILLISECONDS);
                      influxDB.setDatabase(database);
                  } catch (Exception e) {
                      e.printStackTrace();
                  } finally {
                      //设置默认策略
                      this.retention_policy = retention_policy == null || "".equals(retention_policy) ? "autogen" : retention_policy;
                      influxDB.setRetentionPolicy(retention_policy);
                  }
                  //设置日志输出级别
                  influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
                  return influxDB;
              }
          }

          4、InfluxDBUtils工具类,提供增删改查方法

          import org.influxdb.InfluxDB;
          import org.influxdb.annotation.Column;
          import org.influxdb.annotation.Measurement;
          import org.influxdb.dto.BatchPoints;
          import org.influxdb.dto.Point;
          import org.influxdb.dto.Query;
          import org.influxdb.dto.QueryResult;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.beans.factory.annotation.Value;
          import org.springframework.stereotype.Component;
          import java.lang.reflect.Field;
          import java.util.ArrayList;
          import java.util.HashMap;
          import java.util.List;
          import java.util.Map;
          import java.util.concurrent.TimeUnit;
          /**
           *
           */
          @Component
          public class InfluxDBUtils {
              @Autowired
              private InfluxDB influxDB;
              @Value("${spring.influx.database}")
              private String database;
              /**
               * 插入单条数据写法1
               *
               * @param measurement
               */
              public void insertOne01(String measurement) {
                  //构建
                  Point.Builder builder = Point.measurement(measurement);
                  //可指定时间戳
                  builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                  //tag属性只能存储String类型
                  builder.tag("name", "zhanggang");
                  //设置field
                  builder.addField("filed", "fileValue");
                  influxDB.write(builder.build());
              }
              /**
               * 插入单条数据写法2
               *
               * @param measurement
               */
              public void insertOne02(String measurement, Map<String, String> tags, Map<String, Object> fields) {
                  //构建
                  Point.Builder builder = Point.measurement(measurement);
                  //可指定时间戳
                  builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                  //tag属性只能存储String类型
                  builder.tag(tags);
                  //设置field
                  builder.fields(fields);
                  influxDB.write(builder.build());
              }
              /**
               * 插入单条数据
               * influxDB开启UDP功能, 默认端口:8089,默认数据库:udp,没提供代码传数据库功能接口
               * 使用UDP的原因
               * TCP数据传输慢,UDP数据传输快。
               * 网络带宽需求较小,而实时性要求高。
               * InfluxDB和服务器在同机房,发生数据丢包的可能性较小,即使真的发生丢包,对整个请求流量的收集影响也较小。
               *
               * @param measurement
               * @param tags
               * @param fields
               */
              public void insertUDPOne03(String measurement, Map<String, String> tags, Map<String, Object> fields) {
                  //构建
                  Point.Builder builder = Point.measurement(measurement);
                  //可指定时间戳
                  builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                  //tag属性只能存储String类型
                  builder.tag(tags);
                  //设置field
                  builder.fields(fields);
                  int udpPort = 8089;
                  influxDB.write(udpPort, builder.build());
              }
              //批量插入1
              public void insertBatchByRecords() {
                  List<String> lines = new ArrayList<String>();
                  String measurement = "test-inset-one";
                  for (int i = 0; i < 2; i++) {
                      Point.Builder builder = Point.measurement(measurement);
                      //tag属性只能存储String类型
                      builder.tag("name", "zhanggang" + i);
                      //设置field
                      builder.addField("filed", "fileValue" + i);
                      lines.add(builder.build().lineProtocol());
                  }
                  influxDB.write(lines);
              }
              //批量插入2
              public void insertBatchByPoints() {
                  BatchPoints batchPoints = BatchPoints.database(database)
                          .consistency(InfluxDB.ConsistencyLevel.ALL)
                          .build();
                  String measurement = "test-inset-one";
                  for (int i = 0; i < 2; i++) {
                      Point.Builder builder = Point.measurement(measurement);
                      //tag属性只能存储String类型
                      builder.tag("name", "zhanggang" + i);
                      //设置field
                      builder.addField("filed", "fileValue" + i);
                      batchPoints.point(builder.build());
                  }
                  influxDB.write(batchPoints);
              }
              /**
               * 查询,返回Map集合
               *
               * @param query 完整的查询语句
               * @return
               */
              public List<Map<String, Object>> fetchRecords(String query) {
                  List<Map<String, Object>> results = new ArrayList<>();
                  QueryResult queryResult = influxDB.query(new Query(query, database));
                  queryResult.getResults().forEach(result -> {
                      result.getSeries().forEach(serial -> {
                          List<String> columns = serial.getColumns();
                          int fieldSize = columns.size();
                          serial.getValues().forEach(value -> {
                              Map<String, Object> obj = new HashMap<String, Object>();
                              for (int i = 0; i < fieldSize; i++) {
                                  obj.put(columns.get(i), value.get(i));
                              }
                              results.add(obj);
                          });
                      });
                  });
                  return results;
              }
              /**
               * 批量写入数据
               *
               * @param database        数据库
               * @param retentionPolicy 保存策略
               * @param consistency     一致性
               * @param records         要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)
               */
              public void batchInsert(final String database, final String retentionPolicy,
                                      final InfluxDB.ConsistencyLevel consistency, final List<String> records) {
                  influxDB.write(database, retentionPolicy, consistency, records);
              }
              /**
               * 查询
               *
               * @param command 查询语句
               * @return
               */
              public QueryResult query(String command) {
                  return influxDB.query(new Query(command, database));
              }
              /**
               * 创建数据保留策略
               * 设置数据保存策略 defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT
               * 表示 设为默认的策略
               */
              public void createRetentionPolicy() {
                  String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
                          "defalut", database, "30d", 1);
                  this.query(command);
              }
           }

          5、在Controller类进行操作

          @RestController
          public class InfluxDBController {
              @Autowired
              private InfluxDBUtils influxDBUtils;
              @GetMapping("/insertOne")
              public String insertOne() {
                  String measurement = "test-inset-one";
                  influxDBUtils.insertOne01(measurement);
                  return "success";
              }
              @GetMapping("/query01")
              public List<Map<String, Object>> query01() {
                  String sql = "SELECT * FROM \"test-inset-one\"   TZ('Asia/Shanghai')";
                  return influxDBUtils.fetchRecords(sql);
              }

          七、influxdb图形化界面客户端

          InfluxDB Studio工具图形化界面操作Influxdb

          在这里插入图片描述

          到此这篇关于influxdb数据库常用命令及SpringBoot整合的文章就介绍到这了,更多相关influxdb数据库常用命令内容请搜索PHP之友以前的文章或继续浏览下面的相关文章希望大家以后多多支持PHP之友!

          您可能感兴趣的文章:
          • Springboot使用influxDB时序数据库的实现
          • docker环境搭建JMeter+Grafana+influxdb可视化性能监控平台的教程
          • 借助Docker搭建JMeter+Grafana+Influxdb监控平台的详细教程
          • docker安装influxdb的详细教程(性能测试)
          • Linux下安装grafana并且添加influxdb监控的方法

          www.htsjk.Com true http://www.htsjk.com/shujukunews/46793.html NewsArticle influxdb数据库常用命令及SpringBoot整合, 目录 一、influxdb一些概念 1、时序数据库 2、influxdb和mysql类比 3、points(类似表的一行数据)数据结构 4、measurement特性 5、Tag和Field 6、Series 二、数...
          评论暂时关闭