|
|
这里或许是互联网从业者的最后一片净土,随客社区期待您的加入!
您需要 登录 才可以下载或查看,没有账号?立即注册
×
背景与目标在分布式系统和微服务架构中,日志是排查问题、审计行为、监控状态的关键来源。运维与开发往往需要一个轻量级、可部署到每台机器上的日志采集 Agent,将本地日志可靠地采集并上报到日志聚合系统。
目标功能- 支持监控并实时 tail 多个日志文件(支持通配符)。
- 处理日志切割(rotation),保证数据不丢失。
- 将日志按批次以 JSON 格式发送到远端(HTTP/Kafka/gRPC 可扩展)。
- 支持并发发送、限流、指数退避重试、本地队列缓冲。
- 可优雅停止,尽可能保证剩余日志送达。
适用场景- 小型/中型集群的轻量级日志采集。
- 开发/调试环境下的快速搭建。
- 自研日志管道的一部分。
技术选型- 语言:Go(并发模型天然适合)。
- 文件 tail:[url=]github.com/hpcloud/tail[/url](成熟、支持 rotation)。
- 传输协议:HTTP POST + gzip + JSON(后端易于接入,生产可替换 Kafka/gRPC)。
- 配置方式:命令行 flags(简洁直观,易扩展到 YAML/JSON 配置)。
- 重试策略:指数退避(带上限,防止雪崩)。
注:hpcloud/tail 已经成熟稳定,能很好处理 logrotate 带来的文件重建问题。如果对断点续传有严格要求,可以扩展 offset 持久化。
项目结构(示例)
- log-agent/
- ├─ main.go
- ├─ sender.go
- ├─ tailer.go
- ├─ go.mod
复制代码 为了方便快速上手,这里直接给出单文件完整示例(main.go)。
完整代码示例
- // main.go
- }
- }
- }
- // runSender 聚合并发送日志,带重试
- func runSender(ctx context.Context, id int, in <-chan LogRecord, endpoint string, batchSize int, batchWait time.Duration) {
- httpClient := &http.Client{Timeout: 10 * time.Second}
- buf := make([]LogRecord, 0, batchSize)
- sendBatch := func(batch []LogRecord) error {
- if len(batch) == 0 {
- return nil
- }
- data, err := json.Marshal(batch)
- if err != nil {
- return err
- }
- var b bytes.Buffer
- gw := gzip.NewWriter(&b)
- if _, err := gw.Write(data); err != nil {
- _ = gw.Close()
- return err
- }
- _ = gw.Close()
- req, _ := http.NewRequest("POST", endpoint, &b)
- req.Header.Set("Content-Encoding", "gzip")
- req.Header.Set("Content-Type", "application/json")
- var attempt int
- for {
- attempt++
- resp, err := httpClient.Do(req)
- if err == nil {
- io.Copy(io.Discard, resp.Body)
- resp.Body.Close()
- if resp.StatusCode >= 200 && resp.StatusCode < 300 {
- return nil
- }
- err = fmt.Errorf("bad status: %s", resp.Status)
- }
- select {
- case <-ctx.Done():
- return fmt.Errorf("context canceled")
- default:
- }
- if attempt >= 5 {
- return err
- }
- sleep := time.Duration(500*(1<<uint(attempt-1))) * time.Millisecond
- if sleep > 10*time.Second {
- sleep = 10 * time.Second
- }
- time.Sleep(sleep)
- }
- }
- timer := time.NewTimer(batchWait)
- defer timer.Stop()
- for {
- select {
- case <-ctx.Done():
- _ = sendBatch(buf)
- return
- case rec, ok := <-in:
- if !ok {
- _ = sendBatch(buf)
- return
- }
- buf = append(buf, rec)
- if len(buf) >= batchSize {
- _ = sendBatch(buf)
- buf = buf[:0]
- if !timer.Stop() {
- select {
- case <-timer.C:
- default:
- }
- }
- timer.Reset(batchWait)
- }
- case <-timer.C:
- if len(buf) > 0 {
- _ = sendBatch(buf)
- buf = buf[:0]
- }
- timer.Reset(batchWait)
- }
- }
- }
复制代码 使用方法初始化模块并获取依赖:
- go mod init example.com/log-agent
- go get github.com/hpcloud/tail
- go build -o log-agent main.go
复制代码 运行示例:
- ./log-agent -paths "/var/log/myapp/*.log" -endpoint "http://log-collector:8080/ingest" -batch 100 -workers 4
复制代码 建议:
- 使用 systemd 管理 agent 生命周期。
- 在 Kubernetes 中以 DaemonSet 或 sidecar 部署。
实践要点与注意事项- 日志切割:ReOpen: true 可处理 logrotate 产生的新文件;生产中建议配合 inode 校验 + offset 持久化,支持断点续传。
- 传输安全:生产环境必须使用 HTTPS + 鉴权(API Key/mTLS)。
- 后端吞吐:控制批量大小和速率,避免打爆后端;可扩展为本地磁盘队列(如 diskqueue)。
- 结构化日志:应用尽量输出 JSON 格式日志,方便聚合与查询。
- 监控与自检:建议加 Prometheus 指标接口,监控队列长度、发送失败率等。
- 敏感信息:日志可能包含 PII/密码/token,采集前可做脱敏。
进一步优化思路- 增加 持久化队列(磁盘存储),保证断网/崩溃不丢日志。
- 支持 多种传输后端(Kafka、gRPC、Elasticsearch、S3)。
- 支持自动注入 标签信息(service/env/pod)。
- 插件化解析器(nginx/app 自定义格式)。
- 动态配置下发(配置中心 / Web UI 管理)。
总结本文从零实现了一个轻量级、高可用的日志采集 Agent:
- 支持多文件采集与切割处理。
- 提供批量发送、指数退避重试。
- 使用 Go 并发与 channel,让代码简洁高效。
把这个 Agent 部署到每台节点,就能为日志聚合系统提供一个可靠、可扩展的数据源。如果你正在构建日志管道或想学习 Go 并发实践,这将是一个很好的实战案例。
|
|