Meta
Meta 数据存储的是系统的内部信息,比如用户信息、数据库信息、RP 信息、shard 的元数据、CQ、subscription 等。相关的配置有:
[meta]
dir = ""
retention-autocreate = true
logging-enabled = true
数据结构
Meta 数据格式是一种多层的结构,底层(通讯层)是通过 Protobuf 的方式描述的,对于单节点的 InfluxDB,通过 Protobuf 协议与磁盘交互。
DAO 层数据格式定义见 meta.proto,下面给出了最外层的数据定义,可见主要存储的是节点、数据库、用户、Shard 等信息。
// services/meta/internal/meta.proto
message Data {
required uint64 Term = 1;
required uint64 Index = 2;
required uint64 ClusterID = 3;
repeated NodeInfo Nodes = 4;
repeated DatabaseInfo Databases = 5;
repeated UserInfo Users = 6;
required uint64 MaxNodeID = 7;
required uint64 MaxShardGroupID = 8;
required uint64 MaxShardID = 9;
// added for 0.10.0
repeated NodeInfo DataNodes = 10;
repeated NodeInfo MetaNodes = 11;
}
// COMMANDS 相关
...
业务层(DTO)数据格式大致与 DAO 类似,两者提供了相互转换的方法,即种 Struct 都有对应的 marshal 和 unmarshal 方法:
// services/meta/data.go
type Data struct {
Term uint64 // associated raft term
Index uint64 // associated raft index
ClusterID uint64
Databases []DatabaseInfo
Users []UserInfo
// adminUserExists provides a constant time mechanism for determining
// if there is at least one admin user.
adminUserExists bool
MaxShardGroupID uint64
MaxShardID uint64
}
func (data *Data) marshal() *internal.Data {
pb := &internal.Data{
Term: proto.Uint64(data.Term),
Index: proto.Uint64(data.Index),
ClusterID: proto.Uint64(data.ClusterID),
MaxShardGroupID: proto.Uint64(data.MaxShardGroupID),
MaxShardID: proto.Uint64(data.MaxShardID),
// Need this for reverse compatibility
MaxNodeID: proto.Uint64(0),
}
pb.Databases = make([]*internal.DatabaseInfo, len(data.Databases))
for i := range data.Databases {
pb.Databases[i] = data.Databases[i].marshal()
}
pb.Users = make([]*internal.UserInfo, len(data.Users))
for i := range data.Users {
pb.Users[i] = data.Users[i].marshal()
}
return pb
}
func (data *Data) unmarshal(pb *internal.Data) {
data.Term = pb.GetTerm()
data.Index = pb.GetIndex()
data.ClusterID = pb.GetClusterID()
data.MaxShardGroupID = pb.GetMaxShardGroupID()
data.MaxShardID = pb.GetMaxShardID()
data.Databases = make([]DatabaseInfo, len(pb.GetDatabases()))
for i, x := range pb.GetDatabases() {
data.Databases[i].unmarshal(x)
}
data.Users = make([]UserInfo, len(pb.GetUsers()))
for i, x := range pb.GetUsers() {
data.Users[i].unmarshal(x)
}
data.adminUserExists = data.hasAdminUser()
}
Client
client 是 Meta 模块暴露出去的接口,外部对 Meta 数据的所有操作都通过 Client,Client 在服务启动的时候初始化:
// services/meta/client.go
type Client struct {
mu sync.RWMutex
closing chan struct{}
changed chan struct{}
cacheData *Data
authCache map[string]authUser
}
func NewClient(config *Config) *Client {
return &Client{
cacheData: &Data{
ClusterID: uint64(rand.Int63()),
Index: 1,
},
closing: make(chan struct{}),
changed: make(chan struct{}),
authCache: make(map[string]authUser),
}
}
下面是系统初始化时,新建 Client 的调用栈:
github.com/influxdata/influxdb/services/meta.NewClient at client.go:73
github.com/influxdata/influxdb/cmd/influxd/run.NewServer at server.go:176
github.com/influxdata/influxdb/cmd/influxd/run.(*Command).Run at command.go:142
main.(*Main).Run at main.go:81
main.main at main.go:45
Server 代表一个 InfluxDB 进程,里面包含一个 MetaClient 的 field,这个 MetaClient 会暴露给各个模块使用。
下面我们:加载数据 => 查询数据 => 更新数据 => 持久化数据 的逻辑来看 Client 是怎么实现的
加载数据
Server 初始化 MetaClient 后,立即调用 MetaClient 的 Open 方法,Open 方法会加载磁盘的上的 meta 目录,通过 protobuf 反序列化成 DAO 层的 data,然后再转换成 DTO 层的 data:
// cmd/influxd/run/server.go
func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s := &Server{
MetaClient: meta.NewClient(c.Meta),
}
if err := s.MetaClient.Open(); err != nil {
return nil, err
}
}
// services/meta/client.go
func (c *Client) Open() error {
if err := c.Load(); err != nil {
return err
}
return nil
}
func (c *Client) Load() error {
file := filepath.Join(c.path, metaFile)
f, err := os.Open(file)
data, err := ioutil.ReadAll(f)
if err := c.cacheData.UnmarshalBinary(data); err != nil {
return err
}
return nil
}
func (data *Data) UnmarshalBinary(buf []byte) error {
var pb internal.Data
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
data.unmarshal(&pb) // 见数据结构一节的 unmarshal 实现
return nil
}
查询数据
数据从磁盘加载进来后会提供给各个模块查询,Client 有很多查询方法,这里不一一列出了。查询逻辑比较简单,下面只给出一些例子:
func (c *Client) Database(name string) *DatabaseInfo {
for _, d := range c.cacheData.Databases {
if d.Name == name {
return &d
}
}
}
func (c *Client) Users() []UserInfo {
users := c.cacheData.Users
if users == nil {
return []UserInfo{}
}
return users
}
更新数据
这里个更新数据包括删除 database、创建用户等各种操作。为了保证数据一致性,这里实现采用复制一份数据的方式,即每次更新数据时都会先复制一份,然后更新复制出来的数据,最后替换老的数据。
绝大部分更新操作都采用如下范式:
// services/meta/client.go
func (c *Client) UpdateMethod(...) (...) {
data := c.cacheData.Clone() // 复制一份数据
// do update
c.commit(data) // 替换老数据,同时也会持久化
return ...
}
func (c *Client) commit(data *Data) error {
data.Index++
// 在更新内存数据之前,会先持久化到磁盘。持久化实现见下节
if err := snapshot(c.path, data); err != nil {
return err
}
c.cacheData = data
close(c.changed)
c.changed = make(chan struct{})
return nil
}
比如创建 database、创建 shard group,见如下代码:
// services/meta/client.go
func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
data := c.cacheData.Clone()
if db := data.Database(name); db != nil {
return db, nil
}
if err := data.CreateDatabase(name); err != nil {
return nil, err
}
// create default retention policy
if c.retentionAutoCreate {
rpi := DefaultRetentionPolicyInfo()
if err := data.CreateRetentionPolicy(name, rpi, true); err != nil {
return nil, err
}
}
db := data.Database(name)
if err := c.commit(data); err != nil {
return nil, err
}
return db, nil
}
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
data := c.cacheData.Clone()
if sg, _ := data.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
return sg, nil
}
sgi, err := createShardGroup(data, database, policy, timestamp)
if err != nil {
return nil, err
}
if err := c.commit(data); err != nil {
return nil, err
}
return sgi, nil
}
持久化数据
持久化操作在两种场景会触发:
各种更新操作,见上节。
在初始化 Client 的时候,若这个节点的 Raft Index 是 1,则会立即持久化。
持久化的逻辑也比较简单:
创建临时文件
通过 protobuf 协议把内存结构的数据转换成字节数组
字节数组写入文件
把临时文件更新为正式文件
// services/meta/client.go
func snapshot(path string, data *Data) error {
filename := filepath.Join(path, metaFile)
tmpFile := filename + "tmp"
f, err := os.Create(tmpFile)
var d []byte
if b, err := data.MarshalBinary(); err != nil {
return err
} else {
d = b
}
if _, err := f.Write(d); err != nil {
return err
}
if err = f.Sync(); err != nil {
return err
}
//close file handle before renaming to support Windows
if err = f.Close(); err != nil {
return err
}
return file.RenameFile(tmpFile, filename)
}
Last updated