admin管理员组

文章数量:1122832

I have a service deployed on multiple instances, all reporting metrics to a common collector. Each instance is identified by a triplet (service.namespace, service.name, service.instance.id). However, this setup lead to a big number of timeseries because the service.instance.id value can have a big cardinality.

I never use this label as a grouping one and so I may remove it, but in this case, the export to Google Cloud Stackdriver fails with clashing timeseries. Is there a way to aggregate the metrics in the collector before exporting them to GCP?

  1. I tried to use the metricstransform processor, but it seems to aggregate only on data points labels.
  2. I tried to drop the service.instance.id label (with the resource processor) and then use the metricstransform but this lead to an error during the export:
2024-11-22T15:25:27.422Z    error    exporterhelper/queue_sender.go:128    Exporting failed. No more retries left. Dropping data.
{
  "kind": "exporter",
  "data_type": "metrics",
  "name": "googlecloud",
  "error": "rpc error:
    code = InvalidArgument
    desc = One or more TimeSeries could not be written:
      timeSeries[0-4,8-14]:
        write for resource=k8s_cluster{location:us-central1,cluster_name:gb-rgke-usc1-production} failed with: Points must be written in order. One or more of the points specified had an older start time than the most recent point.
        error details: name = Unknown  desc = total_point_count:27  success_point_count:2  errors:{status:{code:3}  point_count:10}", "dropped_items": 27
}

Is there a way to achieve that ?

Kind, Alexis


Edit

From the @Jeff's comment, the interval processor seems promising. I built a custom collector distribution and deployed it. While it seems to work (dropping the instance_id and aggregating values) in testing environment (with low traffic), once deployed in production I had multiple errors and the charts in StackDriver doesn't reflect the actual behavior (only 10% of the requests are counted...).

Here is (part of) my otelcol config:

receivers:
  otlp:
    protocols:
      http:
        endpoint: ${env:POD_NAME}:4318
processors:
  resourcedetection:
    detectors: [gcp]
    timeout: 10s
  batch:
  memory_limiter:
    check_interval: 1s
    limit_percentage: 65
    spike_limit_percentage: 20
  resource/merge_instances:
    attributes:
      - key: service.instance.id
        action: delete
  interval:
    interval: 15s
exporters:
  googlecloud:
    project: mygcpproject
service:
  pipelines:
    metrics:
      receivers: [otlp]
      processors: [memory_limiter, batch, resource/merge_instances, interval, resourcedetection]
      exporters: [googlecloud]

And the output of the collector:

2024-11-28T15:12:16.349Z    info    [email protected]/service.go:166    Setting up own telemetry...
2024-11-28T15:12:16.349Z    info    telemetry/metrics.go:70    Serving metrics    {"address": "localhost:8888", "metrics level": "Normal"}
2024-11-28T15:12:16.350Z    info    builders/builders.go:26    Development component. May change in the future.    {"kind": "processor", "name": "interval", "pipeline": "metrics"}
2024-11-28T15:12:16.351Z    info    [email protected]/memorylimiter.go:151    Using percentage memory limiter    {"kind": "processor", "name": "memory_limiter", "pipeline": "logs", "total_memory_mib": 3923, "limit_percentage": 65, "spike_limit_percentage": 20}
2024-11-28T15:12:16.351Z    info    [email protected]/memorylimiter.go:75    Memory limiter configured    {"kind": "processor", "name": "memory_limiter", "pipeline": "logs", "limit_mib": 2550, "spike_limit_mib": 784, "check_interval": 1}
2024-11-28T15:12:16.353Z    info    [email protected]/service.go:238    Starting otelcol-custom...    {"Version": "0.114.0", "NumCPU": 2}
2024-11-28T15:12:16.353Z    info    extensions/extensions.go:39    Starting extensions...
2024-11-28T15:12:16.353Z    info    extensions/extensions.go:42    Extension is starting...    {"kind": "extension", "name": "health_check"}
2024-11-28T15:12:16.353Z    info    [email protected]/healthcheckextension.go:32    Starting health_check extension    {"kind": "extension", "name": "health_check", "config": {"Endpoint":"otel-collector-7698d7ddb-sncck:13133","TLSSetting":null,"CORS":null,"Auth":null,"MaxRequestBodySize":0,"IncludeMetadata":false,"ResponseHeaders":null,"CompressionAlgorithms":null,"ReadTimeout":0,"ReadHeaderTimeout":0,"WriteTimeout":0,"IdleTimeout":0,"Path":"/","ResponseBody":null,"CheckCollectorPipeline":{"Enabled":false,"Interval":"5m","ExporterFailureThreshold":5}}}
2024-11-28T15:12:16.354Z    info    extensions/extensions.go:59    Extension started.    {"kind": "extension", "name": "health_check"}
2024-11-28T15:12:17.023Z    info    internal/resourcedetection.go:126    began detecting resource information    {"kind": "processor", "name": "resourcedetection", "pipeline": "metrics"}
2024-11-28T15:12:17.035Z    info    internal/resourcedetection.go:140    detected resource information    {"kind": "processor", "name": "resourcedetection", "pipeline": "metrics", "resource": {"cloud.account.id":"mygcpproject","cloud.platform":"gcp_kubernetes_engine","cloud.provider":"gcp","cloud.region":"us-central1","host.id":"xxxx","host.name":"xxxx","k8s.cluster.name":"xxxx"}}
2024-11-28T15:12:17.037Z    info    [email protected]/otlp.go:169    Starting HTTP server    {"kind": "receiver", "name": "otlp", "data_type": "metrics", "endpoint": "otel-collector-7698d7ddb-sncck:4318"}
2024-11-28T15:12:17.037Z    info    healthcheck/handler.go:132    Health Check state change    {"kind": "extension", "name": "health_check", "status": "ready"}
2024-11-28T15:12:17.037Z    info    [email protected]/service.go:261    Everything is ready. Begin running and processing data.
2024-11-28T15:12:32.274Z    error    internal/queue_sender.go:92    Exporting failed. Dropping data.    {"kind": "exporter", "data_type": "metrics", "name": "googlecloud", "error": "rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: timeSeries[0-11]: write for resource=k8s_cluster{location:us-central1,cluster_name:xxxx} failed with: Points must be written in order. One or more of the points specified had an older start time than the most recent point.\nerror details: name = Unknown  desc = total_point_count:12  success_point_count:7  errors:{status:{code:3}  point_count:5}", "dropped_items": 12}
go.opentelemetry.io/collector/exporter/exporterhelper/internal.NewQueueSender.func1
    go.opentelemetry.io/collector/[email protected]/exporterhelper/internal/queue_sender.go:92
go.opentelemetry.io/collector/exporter/internal/queue.(*Consumers[...]).Start.func1
    go.opentelemetry.io/collector/[email protected]/internal/queue/consumers.go:43
[...]
2024-11-28T15:13:00.226Z    error    internal/queue_sender.go:92    Exporting failed. Dropping data.    {"kind": "exporter", "data_type": "metrics", "name": "googlecloud", "error": "rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: timeSeries[4]: Field timeSeries[4] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[6]: Field timeSeries[6] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[7]: Field timeSeries[7] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[5]: Field timeSeries[5] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[9]: Field timeSeries[9] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[8]: Field timeSeries[8] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.\nerror details: name = Unknown  desc = total_point_count:10  errors:{status:{code:3}  point_count:6}", "dropped_items": 10}
go.opentelemetry.io/collector/exporter/exporterhelper/internal.NewQueueSender.func1
    go.opentelemetry.io/collector/[email protected]/exporterhelper/internal/queue_sender.go:92
go.opentelemetry.io/collector/exporter/internal/queue.(*Consumers[...]).Start.func1
    go.opentelemetry.io/collector/[email protected]/internal/queue/consumers.go:43

I have a service deployed on multiple instances, all reporting metrics to a common collector. Each instance is identified by a triplet (service.namespace, service.name, service.instance.id). However, this setup lead to a big number of timeseries because the service.instance.id value can have a big cardinality.

I never use this label as a grouping one and so I may remove it, but in this case, the export to Google Cloud Stackdriver fails with clashing timeseries. Is there a way to aggregate the metrics in the collector before exporting them to GCP?

  1. I tried to use the metricstransform processor, but it seems to aggregate only on data points labels.
  2. I tried to drop the service.instance.id label (with the resource processor) and then use the metricstransform but this lead to an error during the export:
2024-11-22T15:25:27.422Z    error    exporterhelper/queue_sender.go:128    Exporting failed. No more retries left. Dropping data.
{
  "kind": "exporter",
  "data_type": "metrics",
  "name": "googlecloud",
  "error": "rpc error:
    code = InvalidArgument
    desc = One or more TimeSeries could not be written:
      timeSeries[0-4,8-14]:
        write for resource=k8s_cluster{location:us-central1,cluster_name:gb-rgke-usc1-production} failed with: Points must be written in order. One or more of the points specified had an older start time than the most recent point.
        error details: name = Unknown  desc = total_point_count:27  success_point_count:2  errors:{status:{code:3}  point_count:10}", "dropped_items": 27
}

Is there a way to achieve that ?

Kind, Alexis


Edit

From the @Jeff's comment, the interval processor seems promising. I built a custom collector distribution and deployed it. While it seems to work (dropping the instance_id and aggregating values) in testing environment (with low traffic), once deployed in production I had multiple errors and the charts in StackDriver doesn't reflect the actual behavior (only 10% of the requests are counted...).

Here is (part of) my otelcol config:

receivers:
  otlp:
    protocols:
      http:
        endpoint: ${env:POD_NAME}:4318
processors:
  resourcedetection:
    detectors: [gcp]
    timeout: 10s
  batch:
  memory_limiter:
    check_interval: 1s
    limit_percentage: 65
    spike_limit_percentage: 20
  resource/merge_instances:
    attributes:
      - key: service.instance.id
        action: delete
  interval:
    interval: 15s
exporters:
  googlecloud:
    project: mygcpproject
service:
  pipelines:
    metrics:
      receivers: [otlp]
      processors: [memory_limiter, batch, resource/merge_instances, interval, resourcedetection]
      exporters: [googlecloud]

And the output of the collector:

2024-11-28T15:12:16.349Z    info    [email protected]/service.go:166    Setting up own telemetry...
2024-11-28T15:12:16.349Z    info    telemetry/metrics.go:70    Serving metrics    {"address": "localhost:8888", "metrics level": "Normal"}
2024-11-28T15:12:16.350Z    info    builders/builders.go:26    Development component. May change in the future.    {"kind": "processor", "name": "interval", "pipeline": "metrics"}
2024-11-28T15:12:16.351Z    info    [email protected]/memorylimiter.go:151    Using percentage memory limiter    {"kind": "processor", "name": "memory_limiter", "pipeline": "logs", "total_memory_mib": 3923, "limit_percentage": 65, "spike_limit_percentage": 20}
2024-11-28T15:12:16.351Z    info    [email protected]/memorylimiter.go:75    Memory limiter configured    {"kind": "processor", "name": "memory_limiter", "pipeline": "logs", "limit_mib": 2550, "spike_limit_mib": 784, "check_interval": 1}
2024-11-28T15:12:16.353Z    info    [email protected]/service.go:238    Starting otelcol-custom...    {"Version": "0.114.0", "NumCPU": 2}
2024-11-28T15:12:16.353Z    info    extensions/extensions.go:39    Starting extensions...
2024-11-28T15:12:16.353Z    info    extensions/extensions.go:42    Extension is starting...    {"kind": "extension", "name": "health_check"}
2024-11-28T15:12:16.353Z    info    [email protected]/healthcheckextension.go:32    Starting health_check extension    {"kind": "extension", "name": "health_check", "config": {"Endpoint":"otel-collector-7698d7ddb-sncck:13133","TLSSetting":null,"CORS":null,"Auth":null,"MaxRequestBodySize":0,"IncludeMetadata":false,"ResponseHeaders":null,"CompressionAlgorithms":null,"ReadTimeout":0,"ReadHeaderTimeout":0,"WriteTimeout":0,"IdleTimeout":0,"Path":"/","ResponseBody":null,"CheckCollectorPipeline":{"Enabled":false,"Interval":"5m","ExporterFailureThreshold":5}}}
2024-11-28T15:12:16.354Z    info    extensions/extensions.go:59    Extension started.    {"kind": "extension", "name": "health_check"}
2024-11-28T15:12:17.023Z    info    internal/resourcedetection.go:126    began detecting resource information    {"kind": "processor", "name": "resourcedetection", "pipeline": "metrics"}
2024-11-28T15:12:17.035Z    info    internal/resourcedetection.go:140    detected resource information    {"kind": "processor", "name": "resourcedetection", "pipeline": "metrics", "resource": {"cloud.account.id":"mygcpproject","cloud.platform":"gcp_kubernetes_engine","cloud.provider":"gcp","cloud.region":"us-central1","host.id":"xxxx","host.name":"xxxx","k8s.cluster.name":"xxxx"}}
2024-11-28T15:12:17.037Z    info    [email protected]/otlp.go:169    Starting HTTP server    {"kind": "receiver", "name": "otlp", "data_type": "metrics", "endpoint": "otel-collector-7698d7ddb-sncck:4318"}
2024-11-28T15:12:17.037Z    info    healthcheck/handler.go:132    Health Check state change    {"kind": "extension", "name": "health_check", "status": "ready"}
2024-11-28T15:12:17.037Z    info    [email protected]/service.go:261    Everything is ready. Begin running and processing data.
2024-11-28T15:12:32.274Z    error    internal/queue_sender.go:92    Exporting failed. Dropping data.    {"kind": "exporter", "data_type": "metrics", "name": "googlecloud", "error": "rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: timeSeries[0-11]: write for resource=k8s_cluster{location:us-central1,cluster_name:xxxx} failed with: Points must be written in order. One or more of the points specified had an older start time than the most recent point.\nerror details: name = Unknown  desc = total_point_count:12  success_point_count:7  errors:{status:{code:3}  point_count:5}", "dropped_items": 12}
go.opentelemetry.io/collector/exporter/exporterhelper/internal.NewQueueSender.func1
    go.opentelemetry.io/collector/[email protected]/exporterhelper/internal/queue_sender.go:92
go.opentelemetry.io/collector/exporter/internal/queue.(*Consumers[...]).Start.func1
    go.opentelemetry.io/collector/[email protected]/internal/queue/consumers.go:43
[...]
2024-11-28T15:13:00.226Z    error    internal/queue_sender.go:92    Exporting failed. Dropping data.    {"kind": "exporter", "data_type": "metrics", "name": "googlecloud", "error": "rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: timeSeries[4]: Field timeSeries[4] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[6]: Field timeSeries[6] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[7]: Field timeSeries[7] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[5]: Field timeSeries[5] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[9]: Field timeSeries[9] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.; timeSeries[8]: Field timeSeries[8] had an invalid value: Duplicate TimeSeries encountered. Only one point can be written per TimeSeries per request.\nerror details: name = Unknown  desc = total_point_count:10  errors:{status:{code:3}  point_count:6}", "dropped_items": 10}
go.opentelemetry.io/collector/exporter/exporterhelper/internal.NewQueueSender.func1
    go.opentelemetry.io/collector/[email protected]/exporterhelper/internal/queue_sender.go:92
go.opentelemetry.io/collector/exporter/internal/queue.(*Consumers[...]).Start.func1
    go.opentelemetry.io/collector/[email protected]/internal/queue/consumers.go:43
Share Improve this question edited Nov 28, 2024 at 15:30 AlexisBRENON asked Nov 22, 2024 at 15:29 AlexisBRENONAlexisBRENON 3,0792 gold badges21 silver badges31 bronze badges 4
  • Have you already checked this Google Cloud documentation on getting started with the OpenTelemetry Collector? Also this documentation might be helpful for you. – HerPat Commented Nov 22, 2024 at 22:51
  • @HerPat thanks for the pointers. The first one is more centered on managed Prometheus exporter while I use the google cloud exporter would you recommend to migrate to Managed Prometheus ? The second one is very interesting as automatic aggregation is what I want, but i cannot find how to use it if I use a prebuilt exporter like I do. – AlexisBRENON Commented Nov 25, 2024 at 9:07
  • If you're not using delta metrics, you could try intervalprocessor, which should work across batches. I believe you'll still need to drop service.instance.id beforehand. – Jeff Commented Nov 25, 2024 at 20:28
  • @Jeff This looks exactly like what I was searching for. However, it isn't part of any distribution yet. I found how to build my own distribution, ~~but not how to package it as a Docker image, do you have any pointer for that ?~~ My bad, I found it: github.com/open-telemetry/opentelemetry-collector-releases/tree/… – AlexisBRENON Commented Nov 26, 2024 at 8:50
Add a comment  | 

1 Answer 1

Reset to default 0

I believe I found the right setup to handle my use case.

receivers:
  otlp:
    protocols:
      http:
        endpoint: ${env:POD_NAME}:4318
processors:
  memory_limiter:
    check_interval: 1s
    limit_percentage: 65
    spike_limit_percentage: 20
  interval:
    interval: 10s
  resourcedetection:
    detectors: [gcp]
    timeout: 10s
  cumulativetodelta:
    max_staleness: 24h
  transform/resource:
    error_mode: ignore
    metric_statements:
    - context: "resource"
      statements:
      - set(attributes["service.instance.id"], attributes["service.namespace"])  # Override instance ID to allow aggregation
      - set(attributes["k8s.namespace.name"], "GMP namespace")  # Set namespace for Managed Prometheus export
  groupbyattrs:  # Group all metrics from the same group of publishers (discard the service instance ID)
    keys:
      - service.name
      - service.version
      - service.namespace
  transform/aggregate:  # Aggregate metrics
    error_mode: ignore
    metric_statements:
    - context: resource
      statements: []
    - context: datapoint
      statements:
      - set(time, TruncateTime(Now(), Duration("10s")))  # Align timestamps to allow aggregation
      - set(start_time, TruncateTime(start_time, Duration("10s")))  # Align timestamps to allow aggregation
      - delete_key(attributes, "http.host")
      - delete_key(attributes, "net.host.port")
      - delete_key(attributes, "http.server_name")
      - delete_key(attributes, "server.address")
      - delete_key(attributes, "server.port")
    - context: metric
      statements:
      - aggregate_on_attributes("sum") where type != METRIC_DATA_TYPE_GAUGE
      - aggregate_on_attributes("mean") where type == METRIC_DATA_TYPE_GAUGE
  deltatocumulative:
exporters:
  googlemanagedprometheus:
service:
  pipelines:
    metrics:
      receivers: [otlp]
      processors:
      - memory_limiter
      - interval  # Throttle publish rate to match Google Managed Prometheus limits
      - resourcedetection  # Add some required resources attributes for GCP
      - cumulativetodelta  # Convert to delta to support horizontal downscaling of the service
      - transform/resource  # Drop (update) the service.instance.id to group all the producers
      - groupbyattrs  # Group all timeseries coming from the same group of producers
      - transform/aggregate  # Aggregate timeseries in each group (requires to align timestamps and to drop unused attributes)
      - deltatocumulative  # Convert back to cumulative to match GCP expectations
      exporters:
      - googlemanagedprometheus

I hope this can help someone else.

本文标签: