Soffio

总结

时序数据库专为时间序列数据优化,解决高写入吞吐和高效查询问题。

核心技术:列式存储、Gorilla压缩、降采样、分片。

代表产品:InfluxDB、Prometheus、TimescaleDB。

应用场景:监控、IoT、金融、日志分析。

时序数据库的演进:从RRDtool到InfluxDB

Time series visualization

引言:时序数据的爆炸式增长

时序数据(Time Series Data)是按时间顺序排列的数据点序列。从IoT传感器读数、股票价格、服务器CPU使用率,到网站访问量——时序数据无处不在。

时序数据的特点

  • 高写入吞吐:每秒百万级甚至千万级数据点
  • 时间为主键:数据一旦写入,几乎不会更新或删除
  • 范围查询:查询某个时间段内的数据
  • 聚合分析:平均值、最大值、百分位数、趋势预测
  • 时间局部性:最近的数据访问频率最高

传统关系型数据库难以高效处理时序数据的特殊工作负载,催生了专门的时序数据库(TSDB - Time Series Database)。

根据DB-Engines的数据,时序数据库是增长最快的数据库类型,这得益于物联网、可观测性、金融科技等领域的爆发式增长。

TSDB market growth

历史演进:从RRDtool到现代TSDB

第一代:RRDtool (1999)

RRDtool(Round-Robin Database tool)是最早的时序数据存储工具之一,由Tobias Oetiker创建。

核心设计

  • 固定大小的循环缓冲区
  • 预定义的数据保留策略
  • 自动降采样(consolidation)
# RRDtool创建命令示例
rrdtool create temperature.rrd \
  --start 1609459200 \
  --step 60 \
  DS:temp:GAUGE:120:-50:50 \
  RRA:AVERAGE:0.5:1:1440 \
  RRA:AVERAGE:0.5:5:2016 \
  RRA:AVERAGE:0.5:60:8760

限制

  • 固定schema,难以扩展
  • 单机存储,无法水平扩展
  • 更新操作需要锁定整个文件
  • 无法处理高基数(high cardinality)数据

第二代:Graphite (2008)

Graphite引入了更灵活的架构,分为三个组件:

  1. Carbon:数据接收和存储
  2. Whisper:固定大小的时序数据库文件
  3. Graphite-Web:查询和可视化
# Graphite metric格式
servers.web01.cpu.usage 75.5 1609459200
servers.web01.memory.used 8192 1609459200
servers.web01.disk.io.read 1024 1609459200

改进

  • 层级命名空间
  • 灵活的聚合函数
  • 实时流式写入

仍存在的问题

  • 每个metric一个文件,高基数场景下文件系统压力大
  • 单机架构,扩展性有限
  • 无内置标签(tags)支持

第三代:OpenTSDB (2010)

OpenTSDB构建在HBase之上,引入了标签(tags)概念:

{
  "metric": "cpu.usage",
  "timestamp": 1609459200,
  "value": 75.5,
  "tags": {
    "host": "web01",
    "region": "us-east-1",
    "datacenter": "dc1"
  }
}

优势

  • 利用HBase的分布式能力
  • 支持高基数数据
  • 灵活的标签维度

挑战

  • 依赖HBase,运维复杂
  • 写入路径较长,延迟较高
  • 查询性能受HBase限制

第四代:现代TSDB

InfluxDB (2013)

专为时序数据从头设计的数据库:

// InfluxDB数据模型
interface InfluxDataPoint {
  measurement: string;
  tags: Record<string, string>;
  fields: Record<string, number | string | boolean>;
  timestamp: number;
}

const example = {
  measurement: "cpu",
  tags: { host: "server01", region: "us-east" },
  fields: { usage: 75.5, idle: 24.5 },
  timestamp: 1609459200000000000
};

TSM存储引擎(Time-Structured Merge Tree)优化了时序数据的存储和压缩。

Prometheus (2012/2015)

云原生监控系统,采用拉模式(pull-based):

# Prometheus配置
scrape_configs:
  - job_name: 'node'
    scrape_interval: 15s
    static_configs:
      - targets: ['localhost:9100']

特点

  • 内置多维数据模型
  • 强大的PromQL查询语言
  • 服务发现集成
  • 单机设计,简单可靠
# PromQL示例
rate(http_requests_total{job="api"}[5m])

histogram_quantile(0.95, 
  rate(http_request_duration_seconds_bucket[5m])
)

TimescaleDB (2017)

基于PostgreSQL的时序扩展:

CREATE TABLE metrics (
  time TIMESTAMPTZ NOT NULL,
  device_id INTEGER,
  temperature DOUBLE PRECISION
);

SELECT create_hypertable('metrics', 'time');

ALTER TABLE metrics SET (
  timescaledb.compress,
  timescaledb.compress_segmentby = 'device_id'
);

优势

  • 完整的SQL支持
  • 事务和JOIN
  • 丰富的生态系统
  • 自动分区(chunks)

TSDB timeline

核心技术深度解析

数据模型:Metric + Tags + Fields

现代TSDB普遍采用的数据模型:

interface TimeSeriesDataPoint {
  timestamp: number;
  metric: string;
  tags: Record<string, string>;
  fields: Record<string, number>;
}

const dataPoint: TimeSeriesDataPoint = {
  timestamp: 1609459200000000000,
  metric: "system_metrics",
  tags: { 
    host: "server1", 
    region: "us-east",
    env: "production"
  },
  fields: {
    cpu_usage: 75.5,
    memory_used: 8192,
    disk_io: 1024
  }
};

设计权衡

  • Tags:低基数,创建索引,用于过滤和分组
  • Fields:高基数,不索引,存储实际数据

存储优化:列式存储与压缩

列式存储的优势

// 行式存储(传统数据库)
interface RowStore {
  rows: Array<{
    timestamp: number;
    cpu: number;
    memory: number;
    disk: number;
  }>;
}

// 列式存储(时序数据库)
interface ColumnStore {
  timestamps: number[];
  cpu: number[];
  memory: number[];
  disk: number[];
}

优势

  1. 减少I/O:只读取需要的列
  2. 更高压缩率:相同类型数据连续存储
  3. SIMD优化:批量处理同类型数据

Gorilla时间戳压缩

Facebook的Gorilla系统提出的压缩算法:

class GorillaTimestampCompression {
  compress(timestamps: number[]): Uint8Array {
    const bits: boolean[] = [];
    
    this.writeBits(bits, timestamps[0], 64);
    
    let prevTimestamp = timestamps[0];
    let prevDelta = 0;
    
    for (let i = 1; i < timestamps.length; i++) {
      const timestamp = timestamps[i];
      const delta = timestamp - prevTimestamp;
      const deltaOfDelta = delta - prevDelta;
      
      if (deltaOfDelta === 0) {
        bits.push(false);
      } else if (-63 <= deltaOfDelta && deltaOfDelta <= 64) {
        bits.push(true, false);
        this.writeBits(bits, deltaOfDelta, 7);
      } else if (-255 <= deltaOfDelta && deltaOfDelta <= 256) {
        bits.push(true, true, false);
        this.writeBits(bits, deltaOfDelta, 9);
      } else if (-2047 <= deltaOfDelta && deltaOfDelta <= 2048) {
        bits.push(true, true, true, false);
        this.writeBits(bits, deltaOfDelta, 12);
      } else {
        bits.push(true, true, true, true);
        this.writeBits(bits, deltaOfDelta, 32);
      }
      
      prevTimestamp = timestamp;
      prevDelta = delta;
    }
    
    return this.bitsToBytes(bits);
  }
}

压缩效果

  • 原始:64 bits/timestamp
  • 压缩后:1-2 bits/timestamp
  • 压缩比:30-40倍

Gorilla值压缩

对于浮点数值的压缩:

class GorillaValueCompression {
  compress(values: number[]): Uint8Array {
    const bits: boolean[] = [];
    
    this.writeFloat64(bits, values[0]);
    
    let prevValue = this.float64ToUint64(values[0]);
    let prevLeadingZeros = 0;
    let prevTrailingZeros = 0;
    
    for (let i = 1; i < values.length; i++) {
      const currentValue = this.float64ToUint64(values[i]);
      const xor = prevValue ^ currentValue;
      
      if (xor === 0) {
        bits.push(false);
      } else {
        bits.push(true);
        
        const leadingZeros = this.countLeadingZeros(xor);
        const trailingZeros = this.countTrailingZeros(xor);
        
        if (leadingZeros >= prevLeadingZeros &&
            trailingZeros >= prevTrailingZeros) {
          bits.push(false);
          const meaningfulBits = 64 - prevLeadingZeros - prevTrailingZeros;
          this.writeBits(bits, xor >> prevTrailingZeros, meaningfulBits);
        } else {
          bits.push(true);
          this.writeBits(bits, leadingZeros, 5);
          const meaningfulBits = 64 - leadingZeros - trailingZeros;
          this.writeBits(bits, meaningfulBits, 6);
          this.writeBits(bits, xor >> trailingZeros, meaningfulBits);
          
          prevLeadingZeros = leadingZeros;
          prevTrailingZeros = trailingZeros;
        }
      }
      
      prevValue = currentValue;
    }
    
    return this.bitsToBytes(bits);
  }
}

实际效果

  • 原始:8 bytes/value
  • 压缩后:1.37 bytes/value
  • 压缩比:5.8倍

索引结构:倒排索引

为了高效查询特定标签的时序数据,TSDB使用倒排索引:

class InvertedIndex {
  private index: Map<string, Map<string, Set<number>>> = new Map();
  private series: Map<number, SeriesMetadata> = new Map();
  
  addSeries(id: number, tags: Record<string, string>): void {
    this.series.set(id, { id, tags });
    
    for (const [key, value] of Object.entries(tags)) {
      if (!this.index.has(key)) {
        this.index.set(key, new Map());
      }
      if (!this.index.get(key)!.has(value)) {
        this.index.get(key)!.set(value, new Set());
      }
      this.index.get(key)!.get(value)!.add(id);
    }
  }
  
  query(filters: Record<string, string>): Set<number> {
    const postings: Set<number>[] = [];
    
    for (const [key, value] of Object.entries(filters)) {
      const ids = this.index.get(key)?.get(value);
      if (ids) {
        postings.push(ids);
      } else {
        return new Set();
      }
    }
    
    return this.intersect(postings);
  }
  
  private intersect(sets: Set<number>[]): Set<number> {
    if (sets.length === 0) return new Set();
    if (sets.length === 1) return sets[0];
    
    sets.sort((a, b) => a.size - b.size);
    
    const result = new Set(sets[0]);
    for (let i = 1; i < sets.length; i++) {
      for (const id of result) {
        if (!sets[i].has(id)) {
          result.delete(id);
        }
      }
    }
    
    return result;
  }
}

Inverted index visualization

查询与聚合

时间窗口聚合

class TimeWindowAggregation {
  aggregate(
    dataPoints: TimeSeriesDataPoint[],
    windowSize: number,
    aggFunc: (values: number[]) => number
  ): TimeSeriesDataPoint[] {
    const result: TimeSeriesDataPoint[] = [];
    const windows = this.createWindows(dataPoints, windowSize);
    
    for (const window of windows) {
      const values = window.map(p => p.value);
      result.push({
        timestamp: window[0].timestamp,
        value: aggFunc(values),
        tags: window[0].tags
      });
    }
    
    return result;
  }
  
  mean(values: number[]): number {
    return values.reduce((a, b) => a + b, 0) / values.length;
  }
  
  percentile(values: number[], p: number): number {
    const sorted = values.slice().sort((a, b) => a - b);
    const index = Math.ceil((p / 100) * sorted.length) - 1;
    return sorted[index];
  }
}

InfluxQL示例

-- 基本查询
SELECT mean(cpu_usage) 
FROM metrics 
WHERE host = 'server1' 
  AND time >= now() - 1h 
GROUP BY time(5m);

-- 多重聚合
SELECT 
  mean(cpu_usage) AS avg_cpu,
  max(cpu_usage) AS max_cpu,
  percentile(cpu_usage, 95) AS p95_cpu
FROM metrics 
WHERE region = 'us-east'
  AND time >= now() - 24h
GROUP BY time(1h), host;

PromQL高级查询

# 请求速率
rate(http_requests_total[5m])

# 错误率
sum(rate(http_requests_total{status=~"5.."}[5m])) 
/ 
sum(rate(http_requests_total[5m]))

# 95分位延迟
histogram_quantile(0.95,
  sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
)

# 异常检测
(
  node_cpu_usage
  - avg_over_time(node_cpu_usage[1h])
) / stddev_over_time(node_cpu_usage[1h]) > 3

降采样与数据保留

长期数据自动降低精度,节省存储并加速查询:

interface RetentionPolicy {
  name: string;
  duration: string;
  resolution: string;
  aggregation: string;
}

class DownsamplingEngine {
  private policies: RetentionPolicy[] = [
    { 
      name: "raw", 
      duration: "7d", 
      resolution: "1s",
      aggregation: "none"
    },
    { 
      name: "hourly", 
      duration: "90d", 
      resolution: "1h",
      aggregation: "mean"
    },
    { 
      name: "daily", 
      duration: "5y", 
      resolution: "1d",
      aggregation: "mean"
    }
  ];
  
  async downsample(): Promise<void> {
    for (const policy of this.policies) {
      const cutoff = Date.now() - this.parseDuration(policy.duration);
      
      const rawData = await this.readData({
        timeRange: [cutoff, Date.now()],
        resolution: this.getPreviousResolution(policy)
      });
      
      const downsampled = this.aggregateByWindow(
        rawData,
        this.parseDuration(policy.resolution),
        policy.aggregation
      );
      
      await this.writeData(downsampled, policy.name);
      await this.deleteOldData(policy);
    }
  }
}

实际效果

  • 原始数据(1秒,7天):604,800个数据点
  • 小时级数据(1小时,90天):2,160个数据点
  • 天级数据(1天,5年):1,825个数据点

存储节省:99%+ (对于长期数据)

Downsampling illustration

分布式架构与扩展性

分片策略

class ShardingStrategy {
  timeBasedSharding(timestamp: number, shardCount: number): number {
    const day = Math.floor(timestamp / (24 * 3600 * 1000));
    return day % shardCount;
  }
  
  seriesBasedSharding(seriesKey: string, shardCount: number): number {
    return this.consistentHash(seriesKey) % shardCount;
  }
  
  hybridSharding(
    timestamp: number,
    seriesKey: string,
    shardCount: number
  ): number {
    const timeComponent = Math.floor(timestamp / (7 * 24 * 3600 * 1000));
    const seriesComponent = this.consistentHash(seriesKey);
    return (timeComponent + seriesComponent) % shardCount;
  }
}

复制与高可用

class ReplicationManager {
  private replicationFactor = 3;
  
  async write(dataPoint: DataPoint): Promise<void> {
    const shard = this.getShardForData(dataPoint);
    const replicas = this.getReplicasForShard(shard, this.replicationFactor);
    
    const writes = replicas.map(replica => 
      this.writeToReplica(replica, dataPoint)
    );
    
    const quorum = Math.floor(this.replicationFactor / 2) + 1;
    await this.waitForQuorum(writes, quorum);
  }
}

性能优化与最佳实践

基数控制

class CardinalityOptimization {
  badExample(): void {
    const dataPoint = {
      metric: "api_latency",
      tags: {
        user_id: "12345",
        request_id: "abc-def-ghi"
      },
      value: 42
    };
  }
  
  goodExample(): void {
    const dataPoint = {
      metric: "api_latency",
      tags: {
        endpoint: "/api/users",
        method: "GET",
        status_code: "200"
      },
      fields: {
        latency_ms: 42,
        user_id: 12345
      }
    };
  }
}

批量写入

class BatchWriter {
  private buffer: DataPoint[] = [];
  private batchSize = 5000;
  private flushInterval = 1000;
  
  constructor() {
    setInterval(() => this.flush(), this.flushInterval);
  }
  
  async write(dataPoint: DataPoint): Promise<void> {
    this.buffer.push(dataPoint);
    
    if (this.buffer.length >= this.batchSize) {
      await this.flush();
    }
  }
  
  private async flush(): Promise<void> {
    if (this.buffer.length === 0) return;
    
    const batch = this.buffer.splice(0, this.batchSize);
    
    try {
      await this.tsdb.writeBatch(batch);
    } catch (error) {
      console.error('Batch write failed', error);
      this.buffer.unshift(...batch);
    }
  }
}

实际应用场景

应用性能监控(APM)

class APMMetrics {
  async trackRequest(req: Request, res: Response, duration: number): Promise<void> {
    await this.tsdb.write({
      measurement: "http_request",
      tags: {
        method: req.method,
        endpoint: this.normalizeEndpoint(req.path),
        status_code: res.statusCode.toString()
      },
      fields: {
        duration_ms: duration,
        response_size: res.get('content-length') || 0
      },
      timestamp: Date.now()
    });
  }
}

IoT传感器数据

class IoTDataCollector {
  async collectSensorData(deviceId: string, readings: SensorReadings): Promise<void> {
    await this.tsdb.write({
      measurement: "sensor_readings",
      tags: {
        device_id: deviceId,
        location: readings.location
      },
      fields: {
        temperature: readings.temperature,
        humidity: readings.humidity,
        pressure: readings.pressure
      },
      timestamp: Date.now()
    });
  }
  
  async detectAnomalies(deviceId: string): Promise<Anomaly[]> {
    const query = `
      SELECT 
        mean(temperature) - stddev(temperature) * 3 AS lower_bound,
        mean(temperature) + stddev(temperature) * 3 AS upper_bound
      FROM sensor_readings
      WHERE device_id = '${deviceId}'
        AND time >= now() - 24h
    `;
    
    const bounds = await this.tsdb.query(query);
    
    const anomalies = await this.tsdb.query(`
      SELECT * FROM sensor_readings
      WHERE device_id = '${deviceId}'
        AND time >= now() - 1h
        AND (temperature < ${bounds.lower_bound} 
          OR temperature > ${bounds.upper_bound})
    `);
    
    return anomalies;
  }
}

金融市场数据

class MarketDataStore {
  async storeTickData(symbol: string, tick: TickData): Promise<void> {
    await this.tsdb.write({
      measurement: "market_ticks",
      tags: {
        symbol: symbol,
        exchange: tick.exchange
      },
      fields: {
        price: tick.price,
        volume: tick.volume,
        bid: tick.bid,
        ask: tick.ask
      },
      timestamp: tick.timestamp
    });
  }
}

产品对比与选择指南

特性 InfluxDB Prometheus TimescaleDB
数据模型 Measurement + Tags Metric + Labels SQL Table
查询语言 InfluxQL/Flux PromQL SQL
扩展性 集群版(商业) 联邦 分布式
写入吞吐 100K-1M/s 10K-100K/s 100K-1M/s
压缩率 10-20x 3-5x 15-30x
最佳场景 通用TSDB K8s监控 SQL+时序

选择建议

class TSDBSelection {
  recommend(requirements: Requirements): string {
    if (requirements.needsSQL && requirements.needsJOINs) {
      return "TimescaleDB";
    }
    
    if (requirements.ecosystem === "Kubernetes") {
      return "Prometheus";
    }
    
    if (requirements.writeRate > 1000000) {
      return "VictoriaMetrics";
    }
    
    return "InfluxDB";
  }
}

未来趋势

云原生与Serverless

const tsdb = new ServerlessTSDB({
  autoScaling: true,
  payPerQuery: true,
  storage: "s3"
});

AI驱动的查询优化

class AIQueryOptimizer {
  async optimize(query: string): Promise<string> {
    const pattern = await this.analyzeQuery(query);
    const optimized = await this.ml.optimizeQuery(pattern);
    return optimized;
  }
}

实时流处理集成

class StreamingTSDB {
  async processStream(stream: EventStream): Promise<void> {
    stream
      .window(Duration.seconds(10))
      .aggregate((events) => this.computeMetrics(events))
      .filter((metric) => this.isAnomaly(metric))
      .forEach((anomaly) => this.alert(anomaly));
  }
}

Future of TSDB

结论

时序数据库经过20多年的演进,已经从简单的RRDtool演变为支持PB级数据、百万级写入吞吐的分布式系统。

核心技术突破

  • 列式存储:10-100倍I/O效率提升
  • Gorilla压缩:30-40倍压缩率
  • 倒排索引:毫秒级标签查询
  • 自动降采样:99%+长期存储节省

选择TSDB时考虑

  1. 写入吞吐:每秒需要处理多少数据点?
  2. 查询模式:范围查询?聚合?实时?
  3. 保留策略:数据需要保存多久?
  4. 生态集成:与现有工具栈的兼容性
  5. 运维复杂度:团队能否管理复杂系统?

随着IoT、可观测性、实时分析的持续增长,时序数据库将继续快速演进,向更高性能、更智能、更易用的方向发展。