Go语言实战构建高可用日志采集 Agent
背景与目标在分布式系统和微服务架构中,日志是排查问题、审计行为、监控状态的关键来源。运维与开发往往需要一个轻量级、可部署到每台机器上的日志采集 Agent,将本地日志可靠地采集并上报到日志聚合系统。目标功能
[*]支持监控并实时 tail 多个日志文件(支持通配符)。
[*]处理日志切割(rotation),保证数据不丢失。
[*]将日志按批次以 JSON 格式发送到远端(HTTP/Kafka/gRPC 可扩展)。
[*]支持并发发送、限流、指数退避重试、本地队列缓冲。
[*]可优雅停止,尽可能保证剩余日志送达。
适用场景
[*]小型/中型集群的轻量级日志采集。
[*]开发/调试环境下的快速搭建。
[*]自研日志管道的一部分。
技术选型
[*]语言:Go(并发模型天然适合)。
[*]文件 tail:github.com/hpcloud/tail(成熟、支持 rotation)。
[*]传输协议:HTTP POST + gzip + JSON(后端易于接入,生产可替换 Kafka/gRPC)。
[*]配置方式:命令行 flags(简洁直观,易扩展到 YAML/JSON 配置)。
[*]重试策略:指数退避(带上限,防止雪崩)。
注:hpcloud/tail 已经成熟稳定,能很好处理 logrotate 带来的文件重建问题。如果对断点续传有严格要求,可以扩展 offset 持久化。
项目结构(示例)
log-agent/
├─ main.go
├─ sender.go
├─ tailer.go
├─ go.mod为了方便快速上手,这里直接给出单文件完整示例(main.go)。
完整代码示例
// main.go
}
}
}
// runSender 聚合并发送日志,带重试
func runSender(ctx context.Context, id int, in <-chan LogRecord, endpoint string, batchSize int, batchWait time.Duration) {
httpClient := &http.Client{Timeout: 10 * time.Second}
buf := make([]LogRecord, 0, batchSize)
sendBatch := func(batch []LogRecord) error {
if len(batch) == 0 {
return nil
}
data, err := json.Marshal(batch)
if err != nil {
return err
}
var b bytes.Buffer
gw := gzip.NewWriter(&b)
if _, err := gw.Write(data); err != nil {
_ = gw.Close()
return err
}
_ = gw.Close()
req, _ := http.NewRequest("POST", endpoint, &b)
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("Content-Type", "application/json")
var attempt int
for {
attempt++
resp, err := httpClient.Do(req)
if err == nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return nil
}
err = fmt.Errorf("bad status: %s", resp.Status)
}
select {
case <-ctx.Done():
return fmt.Errorf("context canceled")
default:
}
if attempt >= 5 {
return err
}
sleep := time.Duration(500*(1<<uint(attempt-1))) * time.Millisecond
if sleep > 10*time.Second {
sleep = 10 * time.Second
}
time.Sleep(sleep)
}
}
timer := time.NewTimer(batchWait)
defer timer.Stop()
for {
select {
case <-ctx.Done():
_ = sendBatch(buf)
return
case rec, ok := <-in:
if !ok {
_ = sendBatch(buf)
return
}
buf = append(buf, rec)
if len(buf) >= batchSize {
_ = sendBatch(buf)
buf = buf[:0]
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(batchWait)
}
case <-timer.C:
if len(buf) > 0 {
_ = sendBatch(buf)
buf = buf[:0]
}
timer.Reset(batchWait)
}
}
}使用方法初始化模块并获取依赖:
go mod init example.com/log-agent
go get github.com/hpcloud/tail
go build -o log-agent main.go运行示例:
./log-agent -paths "/var/log/myapp/*.log" -endpoint "http://log-collector:8080/ingest" -batch 100 -workers 4建议:
[*]使用 systemd 管理 agent 生命周期。
[*]在 Kubernetes 中以 DaemonSet 或 sidecar 部署。
实践要点与注意事项
[*]日志切割:ReOpen: true 可处理 logrotate 产生的新文件;生产中建议配合 inode 校验 + offset 持久化,支持断点续传。
[*]传输安全:生产环境必须使用 HTTPS + 鉴权(API Key/mTLS)。
[*]后端吞吐:控制批量大小和速率,避免打爆后端;可扩展为本地磁盘队列(如 diskqueue)。
[*]结构化日志:应用尽量输出 JSON 格式日志,方便聚合与查询。
[*]监控与自检:建议加 Prometheus 指标接口,监控队列长度、发送失败率等。
[*]敏感信息:日志可能包含 PII/密码/token,采集前可做脱敏。
进一步优化思路
[*]增加 持久化队列(磁盘存储),保证断网/崩溃不丢日志。
[*]支持 多种传输后端(Kafka、gRPC、Elasticsearch、S3)。
[*]支持自动注入 标签信息(service/env/pod)。
[*]插件化解析器(nginx/app 自定义格式)。
[*]动态配置下发(配置中心 / Web UI 管理)。
总结本文从零实现了一个轻量级、高可用的日志采集 Agent:
[*]支持多文件采集与切割处理。
[*]提供批量发送、指数退避重试。
[*]使用 Go 并发与 channel,让代码简洁高效。
把这个 Agent 部署到每台节点,就能为日志聚合系统提供一个可靠、可扩展的数据源。如果你正在构建日志管道或想学习 Go 并发实践,这将是一个很好的实战案例。
页:
[1]