kafka实现海量数据下的EFK架构优化升级


一、数据背景

在海量数据场景下,日志管理和分析是一项重要任务。为了解决这个问题,EFK 架构(Elasticsearch + Fluentd + Kibana)已经成为流行的选择。

然而,随着数据规模的增加,传统的 EFK 架构可能面临性能瓶颈和可用性挑战。为了提升架构的性能和可伸缩性,我们可以结合 Kafka 和 Logstash 对 EFK 架构进行优化升级。

首先,引入 Kafka 作为高吞吐量的消息队列是关键的一步。Kafka 可以接收和缓冲大量的日志数据,减轻 Elasticsearch 的压力,并提供更好的可用性和容错性。

然后,我们可以使用 Fluentd 或 Logstash 将日志数据发送到 Kafka 中。将 Kafka 视为中间件层,用于处理日志数据流。这样可以解耦 Fluentd 或 Logstash 和 Elasticsearch 之间的直接连接,提高整体的可靠性和灵活性。

通过 Logstash 的 Kafka 插件,我们可以将 Kafka 中的数据消费到 Logstash 中进行处理和转发。这样 Logstash 就负责从 Kafka 中获取数据,然后根据需要进行过滤、解析和转换,最终将数据发送到 Elasticsearch 进行存储和索引。

图片

Kafka 可以简单理解为一个“消息传递系统”,但它比传统的消息队列更强大、更灵活。它就像一个“数据的高速公路”,专门用来高效地传输和处理大量的实时数据。


1. Kafka 是什么?

Kafka 是一个分布式的消息系统,主要用于处理实时数据流。你可以把它想象成一个“快递中转站”:

  • 生产者(比如手机、传感器、网站等)把数据(消息)发送到 Kafka。
  • Kafka 负责存储这些数据,并把它们传递给需要的人(消费者)。
  • 消费者(比如数据分析系统、监控系统等)从 Kafka 获取数据并进行处理。

2. Kafka 的核心功能

a. 消息传递

  • Kafka 就像一个“邮局”,负责接收、存储和转发消息。
  • 生产者把消息发送到 Kafka,消费者从 Kafka 获取消息。

b. 数据存储

  • Kafka 会把消息持久化到磁盘,即使消费者暂时没有读取,消息也不会丢失。
  • 你可以设置消息的保存时间,比如保存 7 天,之后自动删除。

c. 实时处理

  • Kafka 支持实时数据流动,适合需要快速处理数据的场景,比如实时监控、实时分析等。

3. Kafka 的通俗比喻

比喻 1:快递中转站

  • 生产者:比如你在网上买了一件衣服,商家就是生产者,负责发货。
  • Kafka:快递中转站,负责接收商家的包裹,并把包裹传递给快递员。
  • 消费者:快递员,负责把包裹送到你家。

比喻 2:新闻广播

  • 生产者:新闻记者,负责采集新闻。
  • Kafka:新闻频道,负责把新闻发布给观众。
  • 消费者:观众,订阅新闻频道,获取最新的新闻。

4. Kafka 的优点

a. 高效传输

  • Kafka 可以同时处理大量的消息,每秒可以传输数百万条数据,速度非常快。

b. 数据不丢失

  • Kafka 会把消息持久化到磁盘,即使系统崩溃,数据也不会丢失。

c. 灵活扩展

  • 如果数据量变大,Kafka 可以通过增加服务器(Broker)来扩展,适应更大的数据处理需求。

d. 多用户共享

  • 多个消费者可以同时从 Kafka 获取数据,互不影响。

5. Kafka 的应用场景

a. 实时监控

  • 比如监控服务器的性能指标(CPU、内存、网络等),Kafka 可以实时传输这些数据。

b. 日志收集

  • 比如收集网站的访问日志,Kafka 可以把日志数据集中存储,方便后续分析。

c. 实时分析

  • 比如分析用户的点击行为,Kafka 可以实时传输用户的行为数据,供分析系统使用。

d. 消息通知

  • 比如电商平台的订单状态更新,Kafka 可以实时通知用户订单的最新状态。

6. Kafka 总结

Kafka 就像一个“数据的高速公路”,专门用来高效地传输和处理大量的实时数据。它有以下特点:

  • :每秒可以处理数百万条消息。
  • :数据不会丢失,即使系统崩溃也能恢复。
  • 灵活:可以根据需求扩展,适应不同的数据量。
  • 共享:多个用户可以同时使用,互不影响。

如果你需要处理大量的实时数据,Kafka 是一个非常好的选择!

二、Kafka部署配置

首先在 Kubernetes 集群中安装 Kafka,同样这里使用 Helm 进行安装:

$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update

首先使用 helm pull 拉取 Chart 并解压:

$ helm pull bitnami/kafka --untar --version 17.2.3
$ cd kafka

这里面我们指定使用一个 StorageClass 来提供持久化存储,在 Chart 目录下面创建用于安装的 values 文件:

# values.yaml
## @section Persistence parameters
persistence:
enabled: true
storageClass: "nfs-storage"
accessModes:
  - ReadWriteOnce
size: 20Gi

mountPath: /bitnami/kafka

# 配置zk volumes
zookeeper:
enabled: true
persistence:
  enabled: true
  storageClass: "nfs-storage"
  accessModes:
    - ReadWriteOnce
  size: 20Gi

直接使用上面的 values 文件安装 kafka:

$ helm upgrade --install kafka -f values.yaml --namespace logging .
Release "kafka" does not exist. Installing it now.
NAME: kafka
LAST DEPLOYED: Fri Jun 30 17:48:51 2023
NAMESPACE: logging
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 17.2.3
APP VERSION: 3.2.0

** Please be patient while the chart is being deployed **

Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:

  kafka.logging.svc.cluster.local

Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:

  kafka-0.kafka-headless.logging.svc.cluster.local:9092

To create a pod that you can use as a Kafka client run the following commands:

  kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.2.0-debian-10-r4 --namespace logging --command -- sleep infinity
  kubectl exec --tty -i kafka-client --namespace logging -- bash

  PRODUCER:
      kafka-console-producer.sh \

           --broker-list kafka-0.kafka-headless.logging.svc.cluster.local:9092 \
           --topic test

  CONSUMER:
      kafka-console-consumer.sh \

           --bootstrap-server kafka.logging.svc.cluster.local:9092 \
           --topic test \
           --from-beginning

安装完成后我们可以使用上面的提示来检查 Kafka 是否正常运行:

$ kubectl get pods -n logging -l app.kubernetes.io/instance=kafka
kafka-0             1/1     Running   0         7m58s
kafka-zookeeper-0   1/1     Running   0         7m58s

用下面的命令创建一个 Kafka 的测试客户端 Pod:

$ kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.2.0-debian-10-r4 --namespace logging --command -- sleep infinity
pod/kafka-client created

然后启动一个终端进入容器内部生产消息:

# 生产者
$ kubectl exec --tty -i kafka-client --namespace logging -- bash
I have no name!@kafka-client:/$ kafka-console-producer.sh --broker-list kafka-0.kafka-headless.logging.svc.cluster.local:9092 --topic test
>hello kafka on k8s
>

启动另外一个终端进入容器内部消费消息:

# 消费者
$ kubectl exec --tty -i kafka-client --namespace logging -- bash
I have no name!@kafka-client:/$ kafka-console-consumer.sh --bootstrap-server kafka.logging.svc.cluster.local:9092 --topic test --from-beginning
hello kafka on k8s

如果在消费端看到了生产的消息数据证明我们的 Kafka 已经运行成功了。

三、Fluentd 配置 Kafka

现在有了 Kafka,我们就可以将 Fluentd 的日志数据输出到 Kafka 了,只需要将 Fluentd 配置中的 <match> 更改为使用 Kafka 插件即可,但是在 Fluentd 中输出到 Kafka,需要使用到 fluent-plugin-kafka 插件,所以需要我们自定义下 Docker 镜像,最简单的做法就是在上面 Fluentd 镜像的基础上新增 Kafka 插件即可,Dockerfile 文件如下所示:

FROM quay.io/fluentd_elasticsearch/fluentd:v3.4.0
RUN echo "source 'https://mirrors.tuna.tsinghua.edu.cn/rubygems/'" > Gemfile && gem install bundler
RUN gem install fluent-plugin-kafka -v 0.17.5 --no-document

编译:

$ docker build -t harbor-local.kubernets.cn/library/fluentd-kafka:v0.17.5 .

$ docker push harbor-local.kubernets.cn/library/fluentd-kafka:v0.17.5

接下来替换 Fluentd 的 Configmap 对象中的 <match> 部分,如下所示:

# fluentd-configmap.yaml
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-conf
  namespace: logging
data:
  ......
  output.conf: |-
    <match **>
      @id kafka
      @type kafka2
      @log_level info

      # list of seed brokers
      brokers kafka-0.kafka-headless.logging.svc.cluster.local:9092
      use_event_time true

      # topic settings
      topic_key k8slog
      default_topic messages  # 注意,kafka中消费使用的是这个topic
      # buffer settings
      <buffer k8slog>
        @type file
        path /var/log/td-agent/buffer/td
        flush_interval 3s
      </buffer>

      # data type settings
      <format>
        @type json
      </format>

      # producer settings
      required_acks -1
      compression_codec gzip

    </match>

然后替换运行的 Fluentd 镜像:

# fluentd-daemonset.yaml
image: harbor-local.kubernets.cn/library/fluentd-kafka:v0.17.5

直接更新 Fluentd 的 Configmap 与 DaemonSet 资源对象即可:

$ kubectl apply -f fluentd-configmap.yaml
$ kubectl apply -f fluentd-daemonset.yaml

更新成功后我们可以使用上面的测试 Kafka 客户端来验证是否有日志数据:

$ kubectl exec --tty -i kafka-client --namespace logging -- bash
I have no name!@kafka-client:/$ kafka-console-consumer.sh --bootstrap-server kafka.logging.svc.cluster.local:9092 --topic messages --from-beginning
{"stream":"stdout","docker":{},"kubernetes":{"container_name":"count","namespace_name":"default","pod_name":"counter","container_image":"busybox:latest","host":"node1","labels":{"logging":"true"}},"message":"43883: Tue Jul 2 12:16:30 UTC 2023\n"}
......

四、安装 Logstash

虽然数据从 Kafka 到 Elasticsearch 的方式多种多样,比如可以使用 Kafka Connect Elasticsearch Connector 来实现,我们这里还是采用更加流行的 Logstash 方案,上面我们已经将日志从 Fluentd 采集输出到 Kafka 中去了,接下来我们使用 Logstash 来连接 Kafka 与 Elasticsearch 间的日志数据。

首先使用 helm pull 拉取 Chart 并解压:

$ helm pull elastic/logstash --untar --version 7.17.3
$ cd logstash

同样在 Chart 根目录下面创建用于安装的 Values 文件,如下所示:

# values.yaml
fullnameOverride: logstash

persistence:
  enabled: true

logstashConfig:
  logstash.yml: |
    http.host: 0.0.0.0

# 要注意下格式
logstashPipeline:
  logstash.conf: |
    input { kafka { bootstrap_servers => "kafka-0.kafka-headless.logging.svc.cluster.local:9092" codec => json consumer_threads => 3 topics => ["messages"] } }
    filter {}  # 过滤配置(比如可以删除key、添加geoip等等)
    output { elasticsearch { hosts => [ "elasticsearch-master:9200" ] index => "logstash-k8s-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } }

volumeClaimTemplate:
  accessModes: ["ReadWriteOnce"]
  storageClassName: nfs-storage
  resources:
    requests:
      storage: 30Gi

其中最重要的就是通过 logstashPipeline 配置 logstash 数据流的处理配置,通过 input 指定日志源 kafka 的配置,通过 output 输出到 Elasticsearch,同样直接使用上面的 Values 文件安装 logstash 即可:

$ helm upgrade --install logstash -f values.yaml --namespace logging .
Release "logstash" does not exist. Installing it now.
NAME: logstash
LAST DEPLOYED: Fri Jun 30 19:48:51 2023
NAMESPACE: logging
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
1. Watch all cluster members come up.
  $ kubectl get pods --namespace=logging -l app=logstash -w

安装启动完成后可以查看 logstash 的日志:

$ kubectl get pods --namespace=logging -l app=logstash
NAME         READY   STATUS    RESTARTS   AGE
logstash-0   1/1     Running   0          2m8s
$ kubectl logs -f logstash-0 -n logging
......
{
      "@version" => "1",
        "stream" => "stdout",
    "@timestamp" => 2023-06-30T11:09:16.889Z,
       "message" => "4672: Thu Jun  30 11:09:15 UTC 2023",
    "kubernetes" => {
         "container_image" => "docker.io/library/busybox:latest",
          "container_name" => "count",
                  "labels" => {
            "logging" => "true"
        },
                "pod_name" => "counter",
          "namespace_name" => "default",
                  "pod_ip" => "10.244.2.118",
                    "host" => "node2",
        "namespace_labels" => {
            "kubernetes_io/metadata_name" => "default"
        }
    },
        "docker" => {}
}

由于我们启用了 debug 日志调试,所以我们可以在 logstash 的日志中看到我们采集的日志消息,到这里证明我们的日志数据就获取成功了。

到这里我们就实现了一个使用 Fluentd+Kafka+Logstash+Elasticsearch+Kibana 的 Kubernetes 日志收集工具栈,这里我们完整的 Pod 信息如下所示:

$ kubectl get pods -n logging
NAME                             READY   STATUS    RESTARTS   AGE
elasticsearch-master-0           1/1     Running   0          2d1h
elasticsearch-master-1           1/1     Running   0          2d1h
elasticsearch-master-2           1/1     Running   0          2d1h
fluentd-82nk2                    1/1     Running   0          20m
fluentd-855f7                    1/1     Running   0          20m
fluentd-b5ljg                    1/1     Running   0          20m
fluentd-bz7s4                    1/1     Running   0          23m
fluentd-jw57d                    1/1     Running   6          19m
fluentd-mjs7g                    1/1     Running   0          19m
fluentd-mvxng                    1/1     Running   6          20m
kafka-0                          1/1     Running   0          30m
kafka-client                     1/1     Running   0          29m
kafka-zookeeper-0                1/1     Running   0          30m
kibana-kibana-6bb4864dd6-rj7zm   1/1     Running   0          2d1h
logstash-0                       1/1     Running   0          7m38s

当然在实际的工作项目中还需要我们根据实际的业务场景来进行参数性能调优以及高可用等设置,以达到系统的最优性能。

上面我们在配置 logstash 的时候是将日志输出到 "logstash-k8s-%{+YYYY.MM.dd}" 这个索引模式的,可能有的场景下只通过日期去区分索引不是很合理;

那么我们可以根据自己的需求去修改索引名称,比如可以根据我们的服务名称来进行区分,那么这个服务名称可以怎么来定义呢?

可以是 Pod 的名称或者通过 label 标签去指定,比如我们这里去做一个规范,要求需要收集日志的 Pod 除了需要添加 logging: true 这个标签之外,还需要添加一个 logIndex: <索引名> 的标签。

比如重新更新我们测试的 counter 应用:

apiVersion: v1
kind: Pod
metadata:
  name: counter
  labels:
    logging: "true" # 一定要具有该标签才会被采集
    logIndex: "zhdya"  # 指定索引名称
spec:
  containers:
    - name: count
      image: busybox
      args:
        [
          /bin/sh,
          -c,
          'i=0; while true; do echo "$i: $(date)"; i=$((i+1)); sleep 1; done',
        ]

然后重新更新 Logstash 的配置,修改 values 配置:

# ......
logstashPipeline:
  logstash.conf: |
    input { kafka { bootstrap_servers => "kafka-0.kafka-headless.logging.svc.cluster.local:9092" codec => json consumer_threads => 3 topics => ["messages"] } }
    filter {}  # 过滤配置(比如可以删除key、添加geoip等等)
    output { elasticsearch { hosts => [ "elasticsearch-master:9200" ] index => "k8s-%{[kubernetes][labels][logIndex]}-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } }
# ......

logstash更新:

 $ helm upgrade --install logstash -f values.yaml --namespace logging .

使用上面的 values 值更新 logstash,正常更新后上面的 counter 这个 Pod 日志会输出到一个名为 k8s-zhdya-2023.07.05 的索引去。

这样我们就实现了自定义索引名称,当然你也可以使用 Pod 名称、容器名称、命名空间名称来作为索引的名称,这完全取决于你自己的需求。