// stack
github.com/influxdata/influxdb/tsdb.(*Store).Open at store.go:221
github.com/influxdata/influxdb/cmd/influxd/run.(*Server).Open at server.go:442
github.com/influxdata/influxdb/cmd/influxd/run.(*Command).Run at command.go:149
main.(*Main).Run at main.go:81
main.main at main.go:45
// server.go
func (s *Server) Open() error {
if err := s.TSDBStore.Open(); err != nil {
return fmt.Errorf("open tsdb store: %s", err)
}
}
Store 的 Open 操作会遍历 data 目录,为每个 db 创建一个 Index,为每个 Shard 创建一个 ShardIndex。
// store.go
func (s *Store) Open() error {
if err := s.loadShards(); err != nil {
return err
}
}
func (s *Store) loadShards() error {
for _, db := range dbDirs { // 每个 db 目录
idx, err := s.createIndexIfNotExists(db.Name()) // Index 是每个 db 创建一个
for _, rp := range rpDirs { // 每个 rp 目录
for _, sh := range shardDirs { // 每个 shard 目录
go func(db, rp, sh string) {
opt := s.EngineOptions
opt.InmemIndex = idx // 通过 opt 把 db 级别的 Index 传递给 ShardIndex
shard := NewShard(shardID, path, walPath, sfile, opt)
err = shard.Open() // ShardIndex 是每个 Shard 创建一个
}(db.Name(), rp.Name(), sh.Name())
}
}
}
}
func (s *Store) createIndexIfNotExists(name string) (interface{}, error) {
if idx := s.indexes[name]; idx != nil {
return idx, nil
}
idx, err := NewInmemIndex(name, sfile) // 即注册的创建 Index 的函数,见注册 Index 一节
return idx, nil
}
// shard.go
func (s *Shard) Open() error {
if err := func() error {
idx, err := NewIndex(s.id, s.database, ipath, seriesIDSet, s.sfile, s.options)
}()
}
func NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt EngineOptions) *Shard {
...
}
// index.go
func NewIndex(...) (Index, error) {
format := options.IndexVersion // 根据配置,拿到注册的创建 ShardIndex 的函数
fn := newIndexFuncs[format]
return fn(id, database, path, seriesIDSet, sfile, options), nil
}
ShardIndex 与 Index 是组合的关系,通过 opt 传递 Index,所以 ShardIndex 的大部分核心接口都是通过 Index 实现的。
ShardIndex
ShardIndex 的方法分为四类:
删除 series,支持单个和批量。
创建 series
SeriesIDSet 相关
仅仅是代理 Index 的方法
数据结构
主要存储了两个字段:
seriesIDSet:series Ids 的位图,通过 Roaring Bitmap 实现
measurements:measurement name 到 series count 的 map
type ShardIndex struct {
id uint64
*Index
seriesIDSet *tsdb.SeriesIDSet
measurements map[string]int // measurement name -> series 的数量
opt tsdb.EngineOptions
}
series: series key 到 series 的索引,series key = measurement + tag set
measurements:measurement name 到 measurement 的索引
type Index struct {
mu sync.RWMutex
database string
sfile *tsdb.SeriesFile
fieldset *tsdb.MeasurementFieldSet
measurements map[string]*measurement // key: measurement name
series map[string]*series // key: series key(measurement + tag set)
seriesSketch, seriesTSSketch estimator.Sketch
measurementsSketch, measurementsTSSketch estimator.Sketch
rebuildQueue sync.Mutex
}
// series 相关
type series struct {
mu sync.RWMutex
deleted bool
ID uint64
Measurement *measurement
Key string
Tags models.Tags
}
type Tags []Tag
type Tag struct {
Key []byte
Value []byte
}
// measurement 相关
type measurement struct {
Database string
Name string `json:"name,omitempty"`
NameBytes []byte
mu sync.RWMutex
fieldNames map[string]struct{}
seriesByID map[uint64]*series // key: series id
seriesByTagKeyValue map[string]*tagKeyValue // key: tag key
sortedSeriesIDs seriesIDs
dirty bool
}
type tagKeyValue struct {
mu sync.RWMutex
entries map[string]*tagKeyValueEntry // key: tag value
}
type tagKeyValueEntry struct {
m map[uint64]struct{} // key: series id
a seriesIDs // 有序的 series ids
}
// 适合做排序、、交集、并集等操作
type seriesIDs []uint64 // element: series id