Collecting Kubernetes Log Data with OpenTelemetry Collector
Previously we described how to collect metrics data from a Kubernetes cluster with the OpenTelemetry Collector, and next we’ll take a look at how to collect logging data from a cluster.
Installing Loki
First we need to deploy Loki to collect log data, again we’ll use Helm Chart for a quick deployment, but note that again we don’t need to deploy any log collectors as we’ll be using the OpenTelemetry Collector to collect the log data and then send it to Loki.
$ helm repo add grafana https://grafana.github.io/helm-chart
$ helm repo update
Here we create a loki-values.yaml
file to configure the Loki Helm Chart:
# loki-values.yaml
loki:
commonConfig:
replication_factor: 1
auth_enabled: false
storage:
type: "filesystem"
singleBinary:
replicas: 1
persistence:
enabled: true
size: 10Gi
storageClass: cfsauto
monitoring:
lokiCanary:
enabled: false
selfMonitoring:
grafanaAgent:
installOperator: false
test:
enabled: false
gateway:
ingress:
enabled: true
ingressClassName: nginx
tls: []
hosts:
- host: loki.k8s.local
paths:
- path: /
pathType: Prefix
Then you can deploy Loki with a single click using the following command.
$ helm upgrade --install loki grafana/loki -f loki-values.yaml --namespace kube-otel
$ kubectl get pods -n kube-otel -l app.kubernetes.io/instance=loki
NAME READY STATUS RESTARTS AGE
loki-0 1/1 Running 0 3m52s
loki-gateway-5ffc9fbbf5-m5q75 1/1 Running 0 8m42s
$ kubectl get ingress -n kube-otel
NAME CLASS HOSTS ADDRESS PORTS AGE
loki-gateway nginx loki.k8s.local 10.98.12.94 80 11m
Enabling the filelog receiver
Next we need to configure OpenTelemetry Collector to send log data to Loki, first update the otel-collector-ds-values.yaml
file, we need to add a Loki exporter and enable the filelogreceiver
receiver.
# otel-collector-ds-values.yaml
mode: daemonset
presets:
hostMetrics:
enabled: true
kubernetesAttributes:
enabled: true
kubeletMetrics:
enabled: true
# Enable filelogreceiver collector
logsCollection:
enabled: true
config:
exporters:
loki:
endpoint: http://loki-gateway/loki/api/v1/push
timeout: 10s # timeout
read_buffer_size: 200
write_buffer_size: 100
retry_on_failure: # Configuration Retries
enabled: true
initial_interval: 10s # initial interval
max_interval: 60s # Maximum interval
max_elapsed_time: 10m # Maximum time
default_labels_enabled:
exporter: false
processors:
resource:
attributes:
- action: insert
key: loki.resource.labels
value: k8s.namespace.name,k8s.pod.name,k8s.container.name
service:
pipelines:
logs:
exporters:
- loki
processors:
- memory_limiter
- k8sattributes
- resource
- batch
Then re-update the OpenTelemetry Collector DaemonSet.
$ helm upgrade --install opentelemetry-collector ./opentelemetry-collector -f otel-ds-values.yaml --namespace kube-otel --create-namespace
Again, to view the full configuration information after the update, use the command kubectl get cm -n opentelemetry-collector-agent -oyaml
.
exporters:
logging:
loglevel: debug
loki:
endpoint: http://loki-gateway/loki/api/v1/push
timeout: 10s
read_buffer_size: 200
write_buffer_size: 100
retry_on_failure:
enabled: true
initial_interval: 10s
max_interval: 60s
max_elapsed_time: 10m
default_labels_enabled:
exporter: false
extensions:
health_check: {}
memory_ballast:
size_in_percentage: 40
processors:
batch: {}
k8sattributes:
extract:
metadata:
- k8s.namespace.name
- k8s.deployment.name
- k8s.statefulset.name
- k8s.daemonset.name
- k8s.cronjob.name
- k8s.job.name
- k8s.node.name
- k8s.pod.name
- k8s.pod.uid
- k8s.pod.start_time
filter:
node_from_env_var: K8S_NODE_NAME
passthrough: false
pod_association:
- sources:
- from: resource_attribute
name: k8s.pod.ip
- sources:
- from: resource_attribute
name: k8s.pod.uid
- sources:
- from: connection
memory_limiter:
check_interval: 5s
limit_percentage: 80
spike_limit_percentage: 25
resource:
attributes:
- action: insert
key: loki.resource.labels
value: k8s.namespace.name,k8s.pod.name,k8s.container.name
receivers:
filelog:
exclude:
- /var/log/pods/kube-otel_opentelemetry-collector*_*/opentelemetry-collector/*.log
include:
- /var/log/pods/*/*/*.log
include_file_name: false
include_file_path: true
operators:
- id: get-format
routes:
- expr: body matches "^\\{"
output: parser-docker
- expr: body matches "^[^ Z]+ "
output: parser-crio
- expr: body matches "^[^ Z]+Z"
output: parser-containerd
type: router
- id: parser-crio
regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
layout: 2006-01-02T15:04:05.999999999Z07:00
layout_type: gotime
parse_from: attributes.time
type: regex_parser
- combine_field: attributes.log
combine_with: ""
id: crio-recombine
is_last_entry: attributes.logtag == 'F'
max_log_size: 102400
output: extract_metadata_from_filepath
source_identifier: attributes["log.file.path"]
type: recombine
- id: parser-containerd
regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
layout: "%Y-%m-%dT%H:%M:%S.%LZ"
parse_from: attributes.time
type: regex_parser
- combine_field: attributes.log
combine_with: ""
id: containerd-recombine
is_last_entry: attributes.logtag == 'F'
max_log_size: 102400
output: extract_metadata_from_filepath
source_identifier: attributes["log.file.path"]
type: recombine
- id: parser-docker
output: extract_metadata_from_filepath
timestamp:
layout: "%Y-%m-%dT%H:%M:%S.%LZ"
parse_from: attributes.time
type: json_parser
- id: extract_metadata_from_filepath
parse_from: attributes["log.file.path"]
regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
type: regex_parser
- from: attributes.stream
to: attributes["log.iostream"]
type: move
- from: attributes.container_name
to: resource["k8s.container.name"]
type: move
- from: attributes.namespace
to: resource["k8s.namespace.name"]
type: move
- from: attributes.pod_name
to: resource["k8s.pod.name"]
type: move
- from: attributes.restart_count
to: resource["k8s.container.restart_count"]
type: move
- from: attributes.uid
to: resource["k8s.pod.uid"]
type: move
- from: attributes.log
to: body
type: move
start_at: beginning
otlp:
protocols:
grpc:
endpoint: ${env:MY_POD_IP}:4317
http:
endpoint: ${env:MY_POD_IP}:4318
service:
extensions:
- health_check
- memory_ballast
pipelines:
logs:
exporters:
- loki
processors:
- memory_limiter
- k8sattributes
- resource
- batch
receivers:
- otlp
- filelog
# Again, only the logs-related configuration is retained, the rest is omitted ......
We have added a new loki
exporter and filelog
receiver.
loki exporter
This exporter is for exporting data to Loki via HTTP.The exporter can be configured with some of the following:
endpoint
: the HTTP endpoint address for Loki (e.g.http://loki:3100/loki/api/v1/push
).default_labels_enabled
(optional): allows to disable the mapping of default labels:exporter
,job
,instance
,level
. Ifdefault_labels_enabled
is omitted, default labels are added. If one of these labels is omitted fromdefault_labels_enabled
, it will be added.
If all default labels are disabled and no other labels are added, log entries will be discarded because at least one label needs to be present to successfully write log records to Loki. metric
otelcol_lokiexporter_send_failed_due_to_missing_labels
will show the number of log records discarded due to the The number of log records that were discarded due to unspecified labels.
The Loki exporter can convert OTLP resource and log attributes to Loki tags and index them. To do this, prompts need to be configured to specify which properties should be set as tags. Hints are themselves attributes and will be ignored when exporting to Loki. The following example uses the attributes
processor to prompt the Loki exporter to set the event.domain
property to a label, and the resource
processor to prompt the Loki exporter to set service.name
to a label.
processors:
attributes:
actions:
- action: insert
key: loki.attribute.labels
value: event.domain
resource:
attributes:
- action: insert
key: loki.resource.labels
value: service.name
Default labels are always set unless disabled by the default_labels_enabled
setting.
job=service.namespace/service.name
instance=service.instance.id
exporter=OTLP
level=severity
If service.name
and service.namespace
exist, then job=service.namespace/service.name
is set. If service.name
exists and service.namespace
does not, then job=service.name
is set. If service.name
does not exist and service.namespace
exists, the job
tag is not set. If service.instance.id
exists then instance=service.instance.id
is set. If service.instance.id
does not exist, the instance
tag is not set.
Our full configuration here is as follows:
loki:
endpoint: http://loki-gateway/loki/api/v1/push
timeout: 10s
read_buffer_size: 200
write_buffer_size: 100
retry_on_failure:
enabled: true
initial_interval: 10s
max_interval: 60s
max_elapsed_time: 10m
We have configured timeouts, read/write buffer sizes, send queues, retries, etc. here.
The read_buffer_size
and write_buffer_size
fields specify the size of the OpenTelemetry exporter’s read and write buffers, respectively. These buffers are used to cache data before sending it to improve sending efficiency and reliability.
The read_buffer_size
field specifies the size of the buffer used by the exporter to read data from the data source. If the data source produces more data than the buffer size, the exporter will read the data in batches and cache it into the buffer until the buffer is filled or there is no more data from the data source.
The write_buffer_size
field specifies the size of the buffer used by the exporter to write metrics data to the target. If the amount of data generated by the exporter exceeds the buffer size, the exporter writes the data to the target in batches and caches it to the buffer until the buffer is filled or the target is unavailable.
By configuring the size of these buffers, you can control the performance and reliability of the OpenTelemetry exporter. If your data source generates a large amount of data, you can increase the size of read_buffer_size
and write_buffer_size
to improve the throughput and efficiency of the exporter. If your target is less stable or your network is less reliable, you can reduce the size of write_buffer_size
to reduce the risk of data loss.
Also added a resource
processor that converts k8s.namespace.name
, k8s.pod.name
, and k8s.container.name
to Loki tags so we can index them in Loki.
resource:
attributes:
- action: insert
key: loki.resource.labels
value: k8s.namespace.name,k8s.pod.name,k8s.container.name
filelog receiver
This receiver is used to collect and parse log data from files. It reads log data from the specified file and sends it to the OpenTelemetry Collector.
Our configuration of this receiver here is shown below:
filelog:
exclude:
- /var/log/pods/kube-otel_opentelemetry-collector*_*/opentelemetry-collector/*.log
include:
- /var/log/pods/*/*/*.log
include_file_name: false
include_file_path: true
operators:
- id: get-format
routes:
- expr: body matches "^\\{"
output: parser-docker
- expr: body matches "^[^ Z]+ "
output: parser-crio
- expr: body matches "^[^ Z]+Z"
output: parser-containerd
type: router
- id: parser-crio
regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
layout: 2006-01-02T15:04:05.999999999Z07:00
layout_type: gotime
parse_from: attributes.time
type: regex_parser
- combine_field: attributes.log
combine_with: ""
id: crio-recombine
is_last_entry: attributes.logtag == 'F'
max_log_size: 102400
output: extract_metadata_from_filepath
source_identifier: attributes["log.file.path"]
type: recombine
- id: parser-containerd
regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
layout: "%Y-%m-%dT%H:%M:%S.%LZ"
parse_from: attributes.time
type: regex_parser
- combine_field: attributes.log
combine_with: ""
id: containerd-recombine
is_last_entry: attributes.logtag == 'F'
max_log_size: 102400
output: extract_metadata_from_filepath
source_identifier: attributes["log.file.path"]
type: recombine
- id: parser-docker
output: extract_metadata_from_filepath
timestamp:
layout: "%Y-%m-%dT%H:%M:%S.%LZ"
parse_from: attributes.time
type: json_parser
- id: extract_metadata_from_filepath
parse_from: attributes["log.file.path"]
regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
type: regex_parser
- from: attributes.stream
to: attributes["log.iostream"]
type: move
- from: attributes.container_name
to: resource["k8s.container.name"]
type: move
- from: attributes.namespace
to: resource["k8s.namespace.name"]
type: move
- from: attributes.pod_name
to: resource["k8s.pod.name"]
type: move
- from: attributes.restart_count
to: resource["k8s.container.restart_count"]
type: move
- from: attributes.uid
to: resource["k8s.pod.uid"]
type: move
- from: attributes.log
to: body
type: move
start_at: beginning
As you can see the configuration is very long, firstly it excludes some log files that don’t need to be collected through exclude
, then it specifies the log files that need to be collected through include
, since our Kubernetes cluster is based on the Containerd container runtime the log directory for collection is /var/log/pods/*/*/*.log
and then include_file_path
to specify whether to add the file path to the attribute log.file.path
and include_file_name
to specify whether to add the file name to the attribute log.file.name
.
start_at
indicates where in the file to start reading the logs at startup. The options are beginning
or end
, the default being end
.
Then there’s the all-important operators
property, which is used to specify how the log file is to be processed. operators are the most basic unit of log processing. Each operator fulfils a single responsibility, such as reading lines from a file, or parsing JSON from a field.The operators are then linked together to form a pipeline to achieve the desired result.
For example a user can read log lines from a file using the file_input
operator. The results of this operation can then be sent to the regex_parser
operator to create fields based on regular expressions. Finally, these results can be sent to the file_output
operator to write the log to a file on disk.
We start here by configuring a router
operator:
id: get-format
routes:
- expr: body matches "^\\{"
output: parser-docker
- expr: body matches "^[^ Z]+ "
output: parser-crio
- expr: body matches "^[^ Z]+Z"
output: parser-containerd
type: router
This operator allows logs to be dynamically routed based on the content of the log, in our case Containerd’s container runtime, which produces log data that matches body matches "^[^ Z]+Z"
and then routes the data to the parser-containerd
operator.
id: parser-containerd
regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
layout: "%Y-%m-%dT%H:%M:%S.%LZ"
parse_from: attributes.time
type: regex_parser
parser-containerd
is a regex_parser
operator that parses the previously routed log data using the specified regular expression and stores the result in the time
, stream
, logtag
, log
attributes. It also formats timestamp timestamps.
Next, successive logs are combined into a single log using the recombine
operator.
combine_field: attributes.log
combine_with: ""
id: containerd-recombine
is_last_entry: attributes.logtag == 'F'
max_log_size: 102400
output: extract_metadata_from_filepath
source_identifier: attributes["log.file.path"]
type: recombine
After the above processing it goes to the operator extract_metadata_from_filepath
which extracts the metadata from the file path using regular expressions and stores it in the attributes namespace
, pod_name
, uid
, container_name
, restart_count
, and so on.
id: extract_metadata_from_filepath
parse_from: attributes["log.file.path"]
regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
type: regex_parser
The next step is to move (or rename) a field from one location to another via the move
operator.
- from: attributes.stream
to: attributes["log.iostream"]
type: move
- from: attributes.container_name
to: resource["k8s.container.name"]
type: move
- from: attributes.namespace
to: resource["k8s.namespace.name"]
type: move
- from: attributes.pod_name
to: resource["k8s.pod.name"]
type: move
- from: attributes.restart_count
to: resource["k8s.container.restart_count"]
type: move
- from: attributes.uid
to: resource["k8s.pod.uid"]
type: move
- from: attributes.log
to: body
type: move
Finally we can add the Loki data source to Grafana.
Then you can see the log data in Loki by switching to the Loki data source on the Explorer page.
Enable k8sobject receiver
Also for Gateway mode collectors we can go ahead and enable the k8sobject
receiver to collect Kubernetes Events data and then update the otel-collector-deploy-values.yaml
file.
# otel-collector-deploy-values.yaml
mode: deployment
# We only need one collector - more than that creates duplicate data
replicaCount: 1
presets:
clusterMetrics:
enabled: true
kubernetesEvents:
enabled: true
config:
exporters:
loki:
endpoint: http://loki-gateway/loki/api/v1/push
timeout: 10s
read_buffer_size: 200
write_buffer_size: 100
retry_on_failure:
enabled: true
initial_interval: 10s
max_interval: 60s
max_elapsed_time: 10m
service:
pipelines:
logs:
exporters:
- loki
Then re-update the OpenTelemetry Collector Deployment:
$ helm upgrade --install opentelemetry-collector-cluster ./opentelemetry-collector -f otel-collector-deploy-values.yaml --namespace kube-otel --create-namespace
Here we have turned on the kubernetesEvents
preset, which corresponds to the configuration shown below.
k8sobjects:
objects:
- group: events.k8s.io
mode: watch
name: events
The k8sobjects
receiver can be used to pull or Watch objects from the Kubernetes API server, here we specify the Kubernetes Events objects to pull via group
, mode
, name
.
Finally, we can also look up the corresponding Events log data in Loki.