Notes
Search…
Meta
Meta 数据存储的是系统的内部信息,比如用户信息、数据库信息、RP 信息、shard 的元数据、CQ、subscription 等。相关的配置有:
1
[meta]
2
dir = ""
3
retention-autocreate = true
4
logging-enabled = true
Copied!

数据结构

Meta 数据格式是一种多层的结构,底层(通讯层)是通过 Protobuf 的方式描述的,对于单节点的 InfluxDB,通过 Protobuf 协议与磁盘交互。
DAO 层数据格式定义见 meta.proto,下面给出了最外层的数据定义,可见主要存储的是节点、数据库、用户、Shard 等信息。
1
// services/meta/internal/meta.proto
2
message Data {
3
required uint64 Term = 1;
4
required uint64 Index = 2;
5
required uint64 ClusterID = 3;
6
7
repeated NodeInfo Nodes = 4;
8
repeated DatabaseInfo Databases = 5;
9
repeated UserInfo Users = 6;
10
11
required uint64 MaxNodeID = 7;
12
required uint64 MaxShardGroupID = 8;
13
required uint64 MaxShardID = 9;
14
15
// added for 0.10.0
16
repeated NodeInfo DataNodes = 10;
17
repeated NodeInfo MetaNodes = 11;
18
}
19
20
// COMMANDS 相关
21
...
Copied!
业务层(DTO)数据格式大致与 DAO 类似,两者提供了相互转换的方法,即种 Struct 都有对应的 marshal 和 unmarshal 方法:
1
// services/meta/data.go
2
type Data struct {
3
Term uint64 // associated raft term
4
Index uint64 // associated raft index
5
ClusterID uint64
6
Databases []DatabaseInfo
7
Users []UserInfo
8
9
// adminUserExists provides a constant time mechanism for determining
10
// if there is at least one admin user.
11
adminUserExists bool
12
13
MaxShardGroupID uint64
14
MaxShardID uint64
15
}
16
17
func (data *Data) marshal() *internal.Data {
18
pb := &internal.Data{
19
Term: proto.Uint64(data.Term),
20
Index: proto.Uint64(data.Index),
21
ClusterID: proto.Uint64(data.ClusterID),
22
23
MaxShardGroupID: proto.Uint64(data.MaxShardGroupID),
24
MaxShardID: proto.Uint64(data.MaxShardID),
25
26
// Need this for reverse compatibility
27
MaxNodeID: proto.Uint64(0),
28
}
29
30
pb.Databases = make([]*internal.DatabaseInfo, len(data.Databases))
31
for i := range data.Databases {
32
pb.Databases[i] = data.Databases[i].marshal()
33
}
34
35
pb.Users = make([]*internal.UserInfo, len(data.Users))
36
for i := range data.Users {
37
pb.Users[i] = data.Users[i].marshal()
38
}
39
40
return pb
41
}
42
43
func (data *Data) unmarshal(pb *internal.Data) {
44
data.Term = pb.GetTerm()
45
data.Index = pb.GetIndex()
46
data.ClusterID = pb.GetClusterID()
47
48
data.MaxShardGroupID = pb.GetMaxShardGroupID()
49
data.MaxShardID = pb.GetMaxShardID()
50
51
data.Databases = make([]DatabaseInfo, len(pb.GetDatabases()))
52
for i, x := range pb.GetDatabases() {
53
data.Databases[i].unmarshal(x)
54
}
55
56
data.Users = make([]UserInfo, len(pb.GetUsers()))
57
for i, x := range pb.GetUsers() {
58
data.Users[i].unmarshal(x)
59
}
60
61
data.adminUserExists = data.hasAdminUser()
62
}
Copied!

Client

client 是 Meta 模块暴露出去的接口,外部对 Meta 数据的所有操作都通过 Client,Client 在服务启动的时候初始化:
1
// services/meta/client.go
2
type Client struct {
3
mu sync.RWMutex
4
closing chan struct{}
5
changed chan struct{}
6
cacheData *Data
7
8
authCache map[string]authUser
9
}
10
11
func NewClient(config *Config) *Client {
12
return &Client{
13
cacheData: &Data{
14
ClusterID: uint64(rand.Int63()),
15
Index: 1,
16
},
17
closing: make(chan struct{}),
18
changed: make(chan struct{}),
19
authCache: make(map[string]authUser),
20
}
21
}
Copied!
下面是系统初始化时,新建 Client 的调用栈:
1
github.com/influxdata/influxdb/services/meta.NewClient at client.go:73
2
github.com/influxdata/influxdb/cmd/influxd/run.NewServer at server.go:176
3
github.com/influxdata/influxdb/cmd/influxd/run.(*Command).Run at command.go:142
4
main.(*Main).Run at main.go:81
5
main.main at main.go:45
Copied!
Server 代表一个 InfluxDB 进程,里面包含一个 MetaClient 的 field,这个 MetaClient 会暴露给各个模块使用。
下面我们:加载数据 => 查询数据 => 更新数据 => 持久化数据 的逻辑来看 Client 是怎么实现的

加载数据

Server 初始化 MetaClient 后,立即调用 MetaClient 的 Open 方法,Open 方法会加载磁盘的上的 meta 目录,通过 protobuf 反序列化成 DAO 层的 data,然后再转换成 DTO 层的 data:
1
// cmd/influxd/run/server.go
2
func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
3
s := &Server{
4
MetaClient: meta.NewClient(c.Meta),
5
}
6
7
if err := s.MetaClient.Open(); err != nil {
8
return nil, err
9
}
10
}
11
12
// services/meta/client.go
13
func (c *Client) Open() error {
14
if err := c.Load(); err != nil {
15
return err
16
}
17
return nil
18
}
19
20
func (c *Client) Load() error {
21
file := filepath.Join(c.path, metaFile)
22
23
f, err := os.Open(file)
24
data, err := ioutil.ReadAll(f)
25
26
if err := c.cacheData.UnmarshalBinary(data); err != nil {
27
return err
28
}
29
return nil
30
}
31
32
func (data *Data) UnmarshalBinary(buf []byte) error {
33
var pb internal.Data
34
if err := proto.Unmarshal(buf, &pb); err != nil {
35
return err
36
}
37
data.unmarshal(&pb) // 见数据结构一节的 unmarshal 实现
38
return nil
39
}
Copied!

查询数据

数据从磁盘加载进来后会提供给各个模块查询,Client 有很多查询方法,这里不一一列出了。查询逻辑比较简单,下面只给出一些例子:
1
func (c *Client) Database(name string) *DatabaseInfo {
2
for _, d := range c.cacheData.Databases {
3
if d.Name == name {
4
return &d
5
}
6
}
7
}
8
9
func (c *Client) Users() []UserInfo {
10
users := c.cacheData.Users
11
if users == nil {
12
return []UserInfo{}
13
}
14
return users
15
}
Copied!

更新数据

这里个更新数据包括删除 database、创建用户等各种操作。为了保证数据一致性,这里实现采用复制一份数据的方式,即每次更新数据时都会先复制一份,然后更新复制出来的数据,最后替换老的数据。
绝大部分更新操作都采用如下范式:
1
// services/meta/client.go
2
func (c *Client) UpdateMethod(...) (...) {
3
data := c.cacheData.Clone() // 复制一份数据
4
5
// do update
6
7
c.commit(data) // 替换老数据,同时也会持久化
8
9
return ...
10
}
11
12
func (c *Client) commit(data *Data) error {
13
data.Index++
14
15
// 在更新内存数据之前,会先持久化到磁盘。持久化实现见下节
16
if err := snapshot(c.path, data); err != nil {
17
return err
18
}
19
20
c.cacheData = data
21
22
close(c.changed)
23
c.changed = make(chan struct{})
24
25
return nil
26
}
Copied!
比如创建 database、创建 shard group,见如下代码:
1
// services/meta/client.go
2
func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
3
data := c.cacheData.Clone()
4
5
if db := data.Database(name); db != nil {
6
return db, nil
7
}
8
9
if err := data.CreateDatabase(name); err != nil {
10
return nil, err
11
}
12
13
// create default retention policy
14
if c.retentionAutoCreate {
15
rpi := DefaultRetentionPolicyInfo()
16
if err := data.CreateRetentionPolicy(name, rpi, true); err != nil {
17
return nil, err
18
}
19
}
20
21
db := data.Database(name)
22
23
if err := c.commit(data); err != nil {
24
return nil, err
25
}
26
27
return db, nil
28
}
29
30
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
31
data := c.cacheData.Clone()
32
if sg, _ := data.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
33
return sg, nil
34
}
35
36
sgi, err := createShardGroup(data, database, policy, timestamp)
37
if err != nil {
38
return nil, err
39
}
40
41
if err := c.commit(data); err != nil {
42
return nil, err
43
}
44
45
return sgi, nil
46
}
Copied!

持久化数据

持久化操作在两种场景会触发:
  • 各种更新操作,见上节。
  • 在初始化 Client 的时候,若这个节点的 Raft Index 是 1,则会立即持久化。
持久化的逻辑也比较简单:
  • 创建临时文件
  • 通过 protobuf 协议把内存结构的数据转换成字节数组
  • 字节数组写入文件
  • 把临时文件更新为正式文件
1
// services/meta/client.go
2
func snapshot(path string, data *Data) error {
3
filename := filepath.Join(path, metaFile)
4
tmpFile := filename + "tmp"
5
6
f, err := os.Create(tmpFile)
7
8
var d []byte
9
if b, err := data.MarshalBinary(); err != nil {
10
return err
11
} else {
12
d = b
13
}
14
15
if _, err := f.Write(d); err != nil {
16
return err
17
}
18
19
if err = f.Sync(); err != nil {
20
return err
21
}
22
23
//close file handle before renaming to support Windows
24
if err = f.Close(); err != nil {
25
return err
26
}
27
28
return file.RenameFile(tmpFile, filename)
29
}
Copied!
Last modified 1yr ago