First, create a new YAML file for the collector configuration. Use the aws_s3 Redpanda Connect input:
input:
aws_s3:
bucket: my-bucket
region: us-east-1
prefix: ${!timestamp_unix().ts_round("1h".parse_duration()).ts_unix()}/
The above section tells Redpanda Connect to read data from your S3 bucket in the specified region and look for objects in the specified prefix (hourly timestamp). You will have to run the collector as a cronjob every hour to ingest the data into Metering & Billing. Adjust the prefix for your needs.
Next, configure the mapping from your schema to CloudEvents using bloblang:
pipeline:
processors:
- mapping: |
root = {
"id": this.id,
"specversion": "1.0",
"type": "your-usage-event-type",
"source": "s3",
"time": this.time,
"subject": this.subject_field,
"data": {
"data": this.data_field,
},
}
Finally, configure the output:
output:
label: 'openmeter'
drop_on:
error: false
error_patterns:
- Bad Request
output:
http_client:
url: '${OPENMETER_URL:https://us.api.konghq.com}/v3/openmeter/events'
verb: POST
headers:
Authorization: 'Bearer $KONNECT_SYSTEM_ACCESS_TOKEN'
Content-Type: 'application/json'
timeout: 30s
retry_period: 15s
retries: 3
max_retry_backoff: 1m
max_in_flight: 64
batch_as_multipart: false
drop_on:
- 400
batching:
count: 100
period: 1s
processors:
- metric:
type: counter
name: openmeter_events_sent
value: 1
- archive:
format: json_array
dump_request_log_level: DEBUG
Replace $KONNECT_SYSTEM_ACCESS_TOKEN with your own system access token.