Collecting Kubernetes Log Data with OpenTelemetry Collector

Previously we described how to collect metrics data from a Kubernetes cluster with the OpenTelemetry Collector, and next we’ll take a look at how to collect logging data from a cluster.

Explore OpenTelemetry Logs in Loki

Installing Loki

First we need to deploy Loki to collect log data, again we’ll use Helm Chart for a quick deployment, but note that again we don’t need to deploy any log collectors as we’ll be using the OpenTelemetry Collector to collect the log data and then send it to Loki.

$ helm repo add grafana https://grafana.github.io/helm-chart
$ helm repo update

Here we create a loki-values.yaml file to configure the Loki Helm Chart:

# loki-values.yaml
loki:
  commonConfig:
    replication_factor: 1
  auth_enabled: false
  storage:
    type: "filesystem"
singleBinary:
  replicas: 1
  persistence:
    enabled: true
    size: 10Gi
    storageClass: cfsauto
monitoring:
  lokiCanary:
    enabled: false
  selfMonitoring:
    grafanaAgent:
      installOperator: false
test:
  enabled: false
gateway:
  ingress:
    enabled: true
    ingressClassName: nginx
    tls: []
    hosts:
      - host: loki.k8s.local
        paths:
          - path: /
            pathType: Prefix

Then you can deploy Loki with a single click using the following command.

$ helm upgrade --install loki grafana/loki -f loki-values.yaml --namespace kube-otel
$ kubectl get pods -n kube-otel -l app.kubernetes.io/instance=loki
NAME                            READY   STATUS    RESTARTS   AGE
loki-0                          1/1     Running   0          3m52s
loki-gateway-5ffc9fbbf5-m5q75   1/1     Running   0          8m42s
$ kubectl get ingress -n kube-otel
NAME                 CLASS   HOSTS               ADDRESS       PORTS   AGE
loki-gateway         nginx   loki.k8s.local      10.98.12.94   80      11m

Enabling the filelog receiver

Next we need to configure OpenTelemetry Collector to send log data to Loki, first update the otel-collector-ds-values.yaml file, we need to add a Loki exporter and enable the filelogreceiver receiver.

# otel-collector-ds-values.yaml
mode: daemonset

presets:
  hostMetrics:
    enabled: true
  kubernetesAttributes:
    enabled: true
  kubeletMetrics:
    enabled: true
  # Enable filelogreceiver collector
  logsCollection:
    enabled: true

config:
  exporters:
    loki:
      endpoint: http://loki-gateway/loki/api/v1/push
      timeout: 10s # timeout
      read_buffer_size: 200
      write_buffer_size: 100
      retry_on_failure: # Configuration Retries
        enabled: true
        initial_interval: 10s # initial interval
        max_interval: 60s # Maximum interval
        max_elapsed_time: 10m # Maximum time
      default_labels_enabled:
        exporter: false

  processors:
    resource:
      attributes:
        - action: insert
          key: loki.resource.labels
          value: k8s.namespace.name,k8s.pod.name,k8s.container.name

  service:
    pipelines:
      logs:
        exporters:
          - loki
        processors:
          - memory_limiter
          - k8sattributes
          - resource
          - batch

Then re-update the OpenTelemetry Collector DaemonSet.

$ helm upgrade --install opentelemetry-collector ./opentelemetry-collector -f otel-ds-values.yaml --namespace kube-otel --create-namespace

Again, to view the full configuration information after the update, use the command kubectl get cm -n opentelemetry-collector-agent -oyaml .

exporters:
  logging:
    loglevel: debug
  loki:
    endpoint: http://loki-gateway/loki/api/v1/push
    timeout: 10s
    read_buffer_size: 200
    write_buffer_size: 100
    retry_on_failure:
      enabled: true
      initial_interval: 10s
      max_interval: 60s
      max_elapsed_time: 10m
    default_labels_enabled:
      exporter: false
extensions:
  health_check: {}
  memory_ballast:
    size_in_percentage: 40
processors:
  batch: {}
  k8sattributes:
    extract:
      metadata:
        - k8s.namespace.name
        - k8s.deployment.name
        - k8s.statefulset.name
        - k8s.daemonset.name
        - k8s.cronjob.name
        - k8s.job.name
        - k8s.node.name
        - k8s.pod.name
        - k8s.pod.uid
        - k8s.pod.start_time
    filter:
      node_from_env_var: K8S_NODE_NAME
    passthrough: false
    pod_association:
      - sources:
          - from: resource_attribute
            name: k8s.pod.ip
      - sources:
          - from: resource_attribute
            name: k8s.pod.uid
      - sources:
          - from: connection
  memory_limiter:
    check_interval: 5s
    limit_percentage: 80
    spike_limit_percentage: 25
  resource:
    attributes:
      - action: insert
        key: loki.resource.labels
        value: k8s.namespace.name,k8s.pod.name,k8s.container.name
receivers:
  filelog:
    exclude:
      - /var/log/pods/kube-otel_opentelemetry-collector*_*/opentelemetry-collector/*.log
    include:
      - /var/log/pods/*/*/*.log
    include_file_name: false
    include_file_path: true
    operators:
      - id: get-format
        routes:
          - expr: body matches "^\\{"
            output: parser-docker
          - expr: body matches "^[^ Z]+ "
            output: parser-crio
          - expr: body matches "^[^ Z]+Z"
            output: parser-containerd
        type: router
      - id: parser-crio
        regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
        timestamp:
          layout: 2006-01-02T15:04:05.999999999Z07:00
          layout_type: gotime
          parse_from: attributes.time
        type: regex_parser
      - combine_field: attributes.log
        combine_with: ""
        id: crio-recombine
        is_last_entry: attributes.logtag == 'F'
        max_log_size: 102400
        output: extract_metadata_from_filepath
        source_identifier: attributes["log.file.path"]
        type: recombine
      - id: parser-containerd
        regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
        timestamp:
          layout: "%Y-%m-%dT%H:%M:%S.%LZ"
          parse_from: attributes.time
        type: regex_parser
      - combine_field: attributes.log
        combine_with: ""
        id: containerd-recombine
        is_last_entry: attributes.logtag == 'F'
        max_log_size: 102400
        output: extract_metadata_from_filepath
        source_identifier: attributes["log.file.path"]
        type: recombine
      - id: parser-docker
        output: extract_metadata_from_filepath
        timestamp:
          layout: "%Y-%m-%dT%H:%M:%S.%LZ"
          parse_from: attributes.time
        type: json_parser
      - id: extract_metadata_from_filepath
        parse_from: attributes["log.file.path"]
        regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
        type: regex_parser
      - from: attributes.stream
        to: attributes["log.iostream"]
        type: move
      - from: attributes.container_name
        to: resource["k8s.container.name"]
        type: move
      - from: attributes.namespace
        to: resource["k8s.namespace.name"]
        type: move
      - from: attributes.pod_name
        to: resource["k8s.pod.name"]
        type: move
      - from: attributes.restart_count
        to: resource["k8s.container.restart_count"]
        type: move
      - from: attributes.uid
        to: resource["k8s.pod.uid"]
        type: move
      - from: attributes.log
        to: body
        type: move
    start_at: beginning
  otlp:
    protocols:
      grpc:
        endpoint: ${env:MY_POD_IP}:4317
      http:
        endpoint: ${env:MY_POD_IP}:4318
service:
  extensions:
    - health_check
    - memory_ballast
  pipelines:
    logs:
      exporters:
        - loki
      processors:
        - memory_limiter
        - k8sattributes
        - resource
        - batch
      receivers:
        - otlp
        - filelog
# Again, only the logs-related configuration is retained, the rest is omitted ......

We have added a new loki exporter and filelog receiver.

loki exporter

This exporter is for exporting data to Loki via HTTP.The exporter can be configured with some of the following:

  • endpoint: the HTTP endpoint address for Loki (e.g. http://loki:3100/loki/api/v1/push).
  • default_labels_enabled (optional): allows to disable the mapping of default labels: exporter, job, instance, level. If default_labels_enabled is omitted, default labels are added. If one of these labels is omitted from default_labels_enabled, it will be added.

If all default labels are disabled and no other labels are added, log entries will be discarded because at least one label needs to be present to successfully write log records to Loki. metric otelcol_lokiexporter_send_failed_due_to_missing_labels will show the number of log records discarded due to the The number of log records that were discarded due to unspecified labels.

The Loki exporter can convert OTLP resource and log attributes to Loki tags and index them. To do this, prompts need to be configured to specify which properties should be set as tags. Hints are themselves attributes and will be ignored when exporting to Loki. The following example uses the attributes processor to prompt the Loki exporter to set the event.domain property to a label, and the resource processor to prompt the Loki exporter to set service.name to a label.

processors:
  attributes:
    actions:
      - action: insert
        key: loki.attribute.labels
        value: event.domain

  resource:
    attributes:
      - action: insert
        key: loki.resource.labels
        value: service.name

Default labels are always set unless disabled by the default_labels_enabled setting.

  • job=service.namespace/service.name
  • instance=service.instance.id
  • exporter=OTLP
  • level=severity

If service.name and service.namespace exist, then job=service.namespace/service.name is set. If service.name exists and service.namespace does not, then job=service.name is set. If service.name does not exist and service.namespace exists, the job tag is not set. If service.instance.id exists then instance=service.instance.id is set. If service.instance.id does not exist, the instance tag is not set.

Our full configuration here is as follows:


loki:
  endpoint: http://loki-gateway/loki/api/v1/push
  timeout: 10s
  read_buffer_size: 200
  write_buffer_size: 100
  retry_on_failure:
    enabled: true
    initial_interval: 10s
    max_interval: 60s
    max_elapsed_time: 10m

We have configured timeouts, read/write buffer sizes, send queues, retries, etc. here.

The read_buffer_size and write_buffer_size fields specify the size of the OpenTelemetry exporter’s read and write buffers, respectively. These buffers are used to cache data before sending it to improve sending efficiency and reliability.

The read_buffer_size field specifies the size of the buffer used by the exporter to read data from the data source. If the data source produces more data than the buffer size, the exporter will read the data in batches and cache it into the buffer until the buffer is filled or there is no more data from the data source.

The write_buffer_size field specifies the size of the buffer used by the exporter to write metrics data to the target. If the amount of data generated by the exporter exceeds the buffer size, the exporter writes the data to the target in batches and caches it to the buffer until the buffer is filled or the target is unavailable.

By configuring the size of these buffers, you can control the performance and reliability of the OpenTelemetry exporter. If your data source generates a large amount of data, you can increase the size of read_buffer_size and write_buffer_size to improve the throughput and efficiency of the exporter. If your target is less stable or your network is less reliable, you can reduce the size of write_buffer_size to reduce the risk of data loss.

Also added a resource processor that converts k8s.namespace.name, k8s.pod.name, and k8s.container.name to Loki tags so we can index them in Loki.

resource:
  attributes:
    - action: insert
      key: loki.resource.labels
      value: k8s.namespace.name,k8s.pod.name,k8s.container.name

filelog receiver

This receiver is used to collect and parse log data from files. It reads log data from the specified file and sends it to the OpenTelemetry Collector.

Our configuration of this receiver here is shown below:

filelog:
  exclude:
    - /var/log/pods/kube-otel_opentelemetry-collector*_*/opentelemetry-collector/*.log
  include:
    - /var/log/pods/*/*/*.log
  include_file_name: false
  include_file_path: true
  operators:
    - id: get-format
      routes:
        - expr: body matches "^\\{"
          output: parser-docker
        - expr: body matches "^[^ Z]+ "
          output: parser-crio
        - expr: body matches "^[^ Z]+Z"
          output: parser-containerd
      type: router
    - id: parser-crio
      regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
      timestamp:
        layout: 2006-01-02T15:04:05.999999999Z07:00
        layout_type: gotime
        parse_from: attributes.time
      type: regex_parser
    - combine_field: attributes.log
      combine_with: ""
      id: crio-recombine
      is_last_entry: attributes.logtag == 'F'
      max_log_size: 102400
      output: extract_metadata_from_filepath
      source_identifier: attributes["log.file.path"]
      type: recombine
    - id: parser-containerd
      regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
      timestamp:
        layout: "%Y-%m-%dT%H:%M:%S.%LZ"
        parse_from: attributes.time
      type: regex_parser
    - combine_field: attributes.log
      combine_with: ""
      id: containerd-recombine
      is_last_entry: attributes.logtag == 'F'
      max_log_size: 102400
      output: extract_metadata_from_filepath
      source_identifier: attributes["log.file.path"]
      type: recombine
    - id: parser-docker
      output: extract_metadata_from_filepath
      timestamp:
        layout: "%Y-%m-%dT%H:%M:%S.%LZ"
        parse_from: attributes.time
      type: json_parser
    - id: extract_metadata_from_filepath
      parse_from: attributes["log.file.path"]
      regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
      type: regex_parser
    - from: attributes.stream
      to: attributes["log.iostream"]
      type: move
    - from: attributes.container_name
      to: resource["k8s.container.name"]
      type: move
    - from: attributes.namespace
      to: resource["k8s.namespace.name"]
      type: move
    - from: attributes.pod_name
      to: resource["k8s.pod.name"]
      type: move
    - from: attributes.restart_count
      to: resource["k8s.container.restart_count"]
      type: move
    - from: attributes.uid
      to: resource["k8s.pod.uid"]
      type: move
    - from: attributes.log
      to: body
      type: move
  start_at: beginning

As you can see the configuration is very long, firstly it excludes some log files that don’t need to be collected through exclude, then it specifies the log files that need to be collected through include, since our Kubernetes cluster is based on the Containerd container runtime the log directory for collection is /var/log/pods/*/*/*.log and then include_file_path to specify whether to add the file path to the attribute log.file.path and include_file_name to specify whether to add the file name to the attribute log.file.name.

start_at indicates where in the file to start reading the logs at startup. The options are beginning or end, the default being end.

Then there’s the all-important operators property, which is used to specify how the log file is to be processed. operators are the most basic unit of log processing. Each operator fulfils a single responsibility, such as reading lines from a file, or parsing JSON from a field.The operators are then linked together to form a pipeline to achieve the desired result.

For example a user can read log lines from a file using the file_input operator. The results of this operation can then be sent to the regex_parser operator to create fields based on regular expressions. Finally, these results can be sent to the file_output operator to write the log to a file on disk.

We start here by configuring a router operator:

id: get-format
routes:
  - expr: body matches "^\\{"
    output: parser-docker
  - expr: body matches "^[^ Z]+ "
    output: parser-crio
  - expr: body matches "^[^ Z]+Z"
    output: parser-containerd
type: router

This operator allows logs to be dynamically routed based on the content of the log, in our case Containerd’s container runtime, which produces log data that matches body matches "^[^ Z]+Z" and then routes the data to the parser-containerd operator.

id: parser-containerd
regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
  layout: "%Y-%m-%dT%H:%M:%S.%LZ"
  parse_from: attributes.time
type: regex_parser

parser-containerd is a regex_parser operator that parses the previously routed log data using the specified regular expression and stores the result in the time, stream, logtag, log attributes. It also formats timestamp timestamps.

Next, successive logs are combined into a single log using the recombine operator.

combine_field: attributes.log
combine_with: ""
id: containerd-recombine
is_last_entry: attributes.logtag == 'F'
max_log_size: 102400
output: extract_metadata_from_filepath
source_identifier: attributes["log.file.path"]
type: recombine

After the above processing it goes to the operator extract_metadata_from_filepath which extracts the metadata from the file path using regular expressions and stores it in the attributes namespace, pod_name, uid, container_name, restart_count, and so on.

id: extract_metadata_from_filepath
parse_from: attributes["log.file.path"]
regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
type: regex_parser

The next step is to move (or rename) a field from one location to another via the move operator.

- from: attributes.stream
  to: attributes["log.iostream"]
  type: move
- from: attributes.container_name
  to: resource["k8s.container.name"]
  type: move
- from: attributes.namespace
  to: resource["k8s.namespace.name"]
  type: move
- from: attributes.pod_name
  to: resource["k8s.pod.name"]
  type: move
- from: attributes.restart_count
  to: resource["k8s.container.restart_count"]
  type: move
- from: attributes.uid
  to: resource["k8s.pod.uid"]
  type: move
- from: attributes.log
  to: body
  type: move

Finally we can add the Loki data source to Grafana.

Loki Data Source

Then you can see the log data in Loki by switching to the Loki data source on the Explorer page.

Loki Logs

Enable k8sobject receiver

Also for Gateway mode collectors we can go ahead and enable the k8sobject receiver to collect Kubernetes Events data and then update the otel-collector-deploy-values.yaml file.

# otel-collector-deploy-values.yaml
mode: deployment

# We only need one collector - more than that creates duplicate data
replicaCount: 1

presets:
  clusterMetrics:
    enabled: true
  kubernetesEvents:
    enabled: true

config:
  exporters:
    loki:
      endpoint: http://loki-gateway/loki/api/v1/push
      timeout: 10s
      read_buffer_size: 200
      write_buffer_size: 100
      retry_on_failure:
        enabled: true
        initial_interval: 10s
        max_interval: 60s
        max_elapsed_time: 10m

  service:
    pipelines:
      logs:
        exporters:
          - loki

Then re-update the OpenTelemetry Collector Deployment:

$ helm upgrade --install opentelemetry-collector-cluster ./opentelemetry-collector -f otel-collector-deploy-values.yaml --namespace kube-otel --create-namespace

Here we have turned on the kubernetesEvents preset, which corresponds to the configuration shown below.

k8sobjects:
  objects:
    - group: events.k8s.io
      mode: watch
      name: events

The k8sobjects receiver can be used to pull or Watch objects from the Kubernetes API server, here we specify the Kubernetes Events objects to pull via group, mode, name.

Finally, we can also look up the corresponding Events log data in Loki.

Logs in Loki