Kafka
Overview
The Parseable Kafka Connector enables log ingestion from Apache Kafka into Parseable, providing a high-performance, scalable, and efficient logging pipeline.
Features
- Consumer & Producer Support: Supports both consuming and producing messages (ready to use for DLT).
- Configurable Buffering & Performance Settings: Optimized for high-throughput data processing.
- Security Integration: Supports SSL/TLS and SASL authentication.
- Fault Tolerance & Partitioning: Handles partition balancing, offsets, and error handling.
Configuration Options
General Kafka Configuration
Parameter | Environment Variable | Default Value | Description | Usage |
---|---|---|---|---|
--bootstrap-servers | P_KAFKA_BOOTSTRAP_SERVERS | localhost:9092 | Comma-separated list of Kafka bootstrap servers. | Specifies the Kafka brokers the client should connect to. |
--client-id | P_KAFKA_CLIENT_ID | parseable-connect | Client ID for Kafka connection. | Identifies the client instance in Kafka logs. |
--partition-listener-concurrency | P_KAFKA_PARTITION_LISTENER_CONCURRENCY | 2 | Number of parallel threads for Kafka partition listeners. | Determines the number of threads used to process Kafka partitions. |
--bad-data-policy | P_CONNECTOR_BAD_DATA_POLICY | fail | Policy for handling bad data. | Determines how the client should handle corrupt or invalid messages. Options: fail, drop (not yet supported), dlt (not yet supported). |
- All parameters can be set using command-line arguments or environment variables.
- Environment variables take precedence over default values.
- When configuring both producer and consumer, make sure to specify relevant options in their respective sections.
For more details, refer to Kafka's official documentation.
Consumer Configuration
Parameter | Environment Variable | Default Value | Description | Usage |
---|---|---|---|---|
--consumer-topics | P_KAFKA_CONSUMER_TOPICS | None | Comma-separated list of topics to consume from. | Specify the Kafka topics the consumer should subscribe to. |
--consumer-group-id | P_KAFKA_CONSUMER_GROUP_ID | parseable-connect-cg | The consumer group ID. | Used to group consumers for load balancing and fault tolerance. |
--buffer-size | P_KAFKA_CONSUMER_BUFFER_SIZE | 10000 | Size of the buffer for batching records per partition. | Controls the number of messages buffered before processing. |
--buffer-timeout | P_KAFKA_CONSUMER_BUFFER_TIMEOUT | 10000ms | Timeout for buffer flush in milliseconds. | Defines the time to wait before flushing buffered messages. |
--consumer-group-instance-id | P_KAFKA_CONSUMER_GROUP_INSTANCE_ID | parseable-connect-cg-ii-{random} | Group instance ID for static membership. | Useful for maintaining static assignments in consumer groups. |
--consumer-partition-strategy | P_KAFKA_CONSUMER_PARTITION_STRATEGY | roundrobin,range | Partition assignment strategy. | Determines how partitions are assigned among consumers. |
--consumer-session-timeout | P_KAFKA_CONSUMER_SESSION_TIMEOUT | 60000 | Session timeout in milliseconds. | Time before a consumer is considered inactive. |
--consumer-heartbeat-interval | P_KAFKA_CONSUMER_HEARTBEAT_INTERVAL | 3000 | Heartbeat interval in milliseconds. | Frequency at which consumers send heartbeats. |
--consumer-max-poll-interval | P_KAFKA_CONSUMER_MAX_POLL_INTERVAL | 300000 | Maximum poll interval in milliseconds. | Maximum time between poll calls before the consumer is considered dead. |
--consumer-enable-auto-offset-store | P_KAFKA_CONSUMER_ENABLE_AUTO_OFFSET_STORE | true | Enable auto offset store. | Determines whether offsets are automatically stored after processing messages. |
--consumer-auto-offset-reset | P_KAFKA_CONSUMER_AUTO_OFFSET_RESET | earliest | Auto offset reset behavior. | Determines whether to start from the beginning (earliest) or latest (latest) offset. |
--consumer-fetch-min-bytes | P_KAFKA_CONSUMER_FETCH_MIN_BYTES | 1 | Minimum bytes to fetch. | The smallest amount of data the broker should send. |
--consumer-fetch-max-bytes | P_KAFKA_CONSUMER_FETCH_MAX_BYTES | 52428800 | Maximum bytes to fetch. | The maximum amount of data fetched in a single request. |
--consumer-fetch-max-wait | P_KAFKA_CONSUMER_FETCH_MAX_WAIT | 500 | Maximum wait time for fetch in milliseconds. | Maximum time the broker should wait before sending data. |
--consumer-max-partition-fetch-bytes | P_KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES | 1048576 | Maximum bytes to fetch per partition. | Limits the maximum data fetched per partition. |
--consumer-queued-min-messages | P_KAFKA_CONSUMER_QUEUED_MIN_MESSAGES | 100000 | Minimum messages to queue. | Controls the minimum number of messages buffered in the consumer. |
--consumer-queued-max-messages-kbytes | P_KAFKA_CONSUMER_QUEUED_MAX_MESSAGES_KBYTES | 65536 | Maximum message queue size in KBytes. | Determines the maximum queue size in kilobytes. |
--consumer-enable-partition-eof | P_KAFKA_CONSUMER_ENABLE_PARTITION_EOF | false | Enable partition EOF. | Signals when the end of a partition is reached. |
--consumer-check-crcs | P_KAFKA_CONSUMER_CHECK_CRCS | false | Check CRCs on messages. | Ensures message integrity by verifying CRCs. |
--consumer-isolation-level | P_KAFKA_CONSUMER_ISOLATION_LEVEL | read_committed | Transaction isolation level. | Controls whether uncommitted transactions should be visible to the consumer. |
--consumer-fetch-message-max-bytes | P_KAFKA_CONSUMER_FETCH_MESSAGE_MAX_BYTES | 1048576 | Maximum bytes per message. | Defines the largest individual message the consumer can fetch. |
--consumer-stats-interval | P_KAFKA_CONSUMER_STATS_INTERVAL | 10000 | Statistics interval in milliseconds. | Defines the frequency at which consumer statistics are collected. |
- All parameters can be set using command-line arguments or environment variables.
- Environment variables take precedence over default values.
- Some values, such as
group_instance_id
, are dynamically generated if not explicitly provided.
For more details, refer to Kafka's official documentation on consumer configurations.
Producer Configuration
Producer configuration is not necessary at this moment. When DLT is implemented, these settings will be required.
Parameter | Environment Variable | Default Value | Description | Usage |
---|---|---|---|---|
--producer-acks | P_KAFKA_PRODUCER_ACKS | all | Number of acknowledgments the producer requires. | Determines when a message is considered successfully sent (0, 1, all). |
--producer-compression-type | P_KAFKA_PRODUCER_COMPRESSION_TYPE | lz4 | Compression type for messages. | Determines how messages are compressed (none, gzip, snappy, lz4, zstd). |
--producer-batch-size | P_KAFKA_PRODUCER_BATCH_SIZE | 16384 | Maximum size of a request in bytes. | Defines the size of batches sent to Kafka. |
--producer-linger-ms | P_KAFKA_PRODUCER_LINGER_MS | 5 | Delay to wait for more messages in the same batch. | Controls latency vs. throughput trade-off. |
--producer-message-timeout-ms | P_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS | 120000 | Local message timeout. | Time before an unacknowledged message is dropped. |
--producer-max-inflight | P_KAFKA_PRODUCER_MAX_INFLIGHT | 5 | Maximum number of in-flight requests per connection. | Controls how many messages can be sent without acknowledgment. |
--producer-message-max-bytes | P_KAFKA_PRODUCER_MESSAGE_MAX_BYTES | 1048576 | Maximum size of a message in bytes. | Restricts the maximum message size that can be sent. |
--producer-enable-idempotence | P_KAFKA_PRODUCER_ENABLE_IDEMPOTENCE | true | Enable idempotent producer. | Ensures exactly-once delivery guarantees. |
--producer-transaction-timeout-ms | P_KAFKA_PRODUCER_TRANSACTION_TIMEOUT_MS | 60000 | Transaction timeout. | Maximum time for a transaction before it times out. |
--producer-buffer-memory | P_KAFKA_PRODUCER_BUFFER_MEMORY | 33554432 | Total bytes of memory the producer can use. | Limits the memory available for buffering messages. |
--producer-retry-backoff-ms | P_KAFKA_PRODUCER_RETRY_BACKOFF_MS | 100 | Time to wait before retrying a failed request. | Defines back-off time for retries. |
--producer-request-timeout-ms | P_KAFKA_PRODUCER_REQUEST_TIMEOUT_MS | 30000 | Time to wait for a response from brokers. | Limits how long the producer waits for broker acknowledgment. |
--producer-queue-buffering-max-messages | P_KAFKA_PRODUCER_QUEUE_BUFFERING_MAX_MESSAGES | 100000 | Maximum number of messages allowed on the producer queue. | Prevents excessive message buffering. |
--producer-queue-buffering-max-kbytes | P_KAFKA_PRODUCER_QUEUE_BUFFERING_MAX_KBYTES | 1048576 | Maximum total message size sum allowed on the producer queue. | Restricts the producer queue's total size. |
--producer-delivery-timeout-ms | P_KAFKA_PRODUCER_DELIVERY_TIMEOUT_MS | 120000 | Maximum time to report success or failure after send. | Defines the upper bound on message delivery time. |
--producer-max-retries | P_KAFKA_PRODUCER_MAX_RETRIES | 2147483647 | Maximum number of retries per message. | Controls how many times a message is retried before failing. |
--producer-retry-backoff-max-ms | P_KAFKA_PRODUCER_RETRY_BACKOFF_MAX_MS | 1000 | Maximum back-off time between retries. | Ensures retries are not too frequent. |
- All parameters can be set using command-line arguments or environment variables.
- Environment variables take precedence over default values.
- Certain parameters, such as
--producer-acks
and--producer-enable-idempotence
, affect message durability and reliability.
For more details, refer to Kafka's official documentation on producer configurations.
Security Configuration
Parameter | Environment Variable | Default Value | Description | Usage |
---|---|---|---|---|
--security-protocol | P_KAFKA_SECURITY_PROTOCOL | PLAINTEXT | Security protocol used for communication. | Determines whether SSL, SASL, or plaintext is used. |
--ssl-ca-location | P_KAFKA_SSL_CA_LOCATION | None | CA certificate file path. | Required when using SSL or SASL_SSL. |
--ssl-certificate-location | P_KAFKA_SSL_CERTIFICATE_LOCATION | None | Client certificate file path. | Required when using SSL or SASL_SSL. |
--ssl-key-location | P_KAFKA_SSL_KEY_LOCATION | None | Client key file path. | Required when using SSL or SASL_SSL. |
--ssl-key-password | P_KAFKA_SSL_KEY_PASSWORD | None | SSL key password. | Used if the SSL key is password protected. |
--sasl-mechanism | P_KAFKA_SASL_MECHANISM | None | SASL authentication mechanism. | Required when using SASL_SSL or SASL_PLAINTEXT (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER). |
--sasl-username | P_KAFKA_SASL_USERNAME | None | SASL username. | Required for PLAIN or SCRAM SASL mechanisms. |
--sasl-password | P_KAFKA_SASL_PASSWORD | None | SASL password. | Required for PLAIN or SCRAM SASL mechanisms. |
--kerberos-service-name | P_KAFKA_KERBEROS_SERVICE_NAME | None | Kerberos service name. | Required when using GSSAPI SASL mechanism. |
--kerberos-principal | P_KAFKA_KERBEROS_PRINCIPAL | None | Kerberos principal. | Used for Kerberos authentication. |
--kerberos-keytab | P_KAFKA_KERBEROS_KEYTAB | None | Path to Kerberos keytab file. | Required when using Kerberos authentication. |
--oauth-token-endpoint | P_KAFKA_OAUTH_TOKEN_ENDPOINT | None | OAuth Bearer token endpoint. | Required when using OAUTHBEARER SASL mechanism. |
--oauth-client-id | P_KAFKA_OAUTH_CLIENT_ID | None | OAuth client ID. | Used for authentication with an OAuth provider. |
--oauth-client-secret | P_KAFKA_OAUTH_CLIENT_SECRET | None | OAuth client secret. | Used to authenticate the OAuth client. |
--oauth-scope | P_KAFKA_OAUTH_SCOPE | None | OAuth scope. | Defines the permissions requested from the OAuth provider. |
Security Configuration Combinations
Plaintext Communication (No Security)
--security-protocol=PLAINTEXT
- No additional parameters required.
SSL Encryption
--security-protocol=SSL
Required parameters:
--ssl-ca-location
--ssl-certificate-location
--ssl-key-location
--ssl-key-password
(if the key is password-protected)
SASL Authentication with SSL
--security-protocol=SASL_SSL
Required parameters:
--sasl-mechanism
--sasl-username
and--sasl-password
(for PLAIN or SCRAM mechanisms)--kerberos-service-name
and--kerberos-principal
(for GSSAPI mechanism)--kerberos-keytab
(if using Kerberos authentication)
SSL parameters (if required by the security policy)
SASL Authentication without SSL
--security-protocol=SASL_PLAINTEXT
Required parameters:
--sasl-mechanism
--sasl-username
and--sasl-password
(for PLAIN or SCRAM mechanisms)--kerberos-service-name
and--kerberos-principal
(for GSSAPI mechanism)--kerberos-keytab
(if using Kerberos authentication)
OAuth Bearer Token Authentication (Not supported yet)
--security-protocol=SASL_SSL or SASL_PLAINTEXT
--sasl-mechanism=OAUTHBEARER
Required parameters:
--oauth-token-endpoint
--oauth-client-id
--oauth-client-secret
--oauth-scope
(if required by the OAuth provider)
Examples
SSL Configuration
export P_KAFKA_SECURITY_PROTOCOL="SSL"
export P_KAFKA_SSL_CA_LOCATION="/path/to/ca.pem"
export P_KAFKA_SSL_CERTIFICATE_LOCATION="/path/to/client-cert.pem"
export P_KAFKA_SSL_KEY_LOCATION="/path/to/client-key.pem"
export P_KAFKA_SSL_KEY_PASSWORD="my-secure-password"
SASL Configuration
export P_KAFKA_SECURITY_PROTOCOL="SASL_SSL"
export P_KAFKA_SASL_MECHANISM="SCRAM-SHA-512"
export P_KAFKA_SASL_USERNAME="my-user"
export P_KAFKA_SASL_PASSWORD="my-password"
For more details, refer to Kafka's official security documentation.
Concurrency and Multi-Instance Processing
The connector uses a dedicated thread per partition with configurable concurrency:
export P_KAFKA_PARTITION_LISTENER_CONCURRENCY="2"
Thread Assignment Formula:
Threads per ingestor = min(Partitions per ingestor, Configured threads)
Example Scenarios:
Balanced Configuration:
- 6 partitions
- 2 ingest nodes
- 3 threads per node
Result: Each thread handles one partition
Over-threaded Configuration:
- 4 partitions
- 2 ingest nodes
- 4 threads per node
Result: 2 threads per node will be idle
Consumer Group Rebalancing
Rebalance triggers:
- New consumer joins
- Existing consumer leaves
- Network issues
- Partition reassignment
Available strategies:
- Range: Assigns consecutive partitions
- RoundRobin: Distributes evenly
- Sticky: Minimizes reassignments (Not recommended for Parseable since rdKafka issues)
- CooperativeSticky: Controlled rebalance (Not recommended for Parseable since rdKafka support is lacking)
Recommended configuration(by default):
export P_KAFKA_CONSUMER_PARTITION_STRATEGY="roundrobin,range"
export P_KAFKA_CONSUMER_SESSION_TIMEOUT="60000"
export P_KAFKA_CONSUMER_HEARTBEAT_INTERVAL="3000"
Metrics
All metrics listed in librdkafka's statistics documentation are available at the /metrics endpoint.
Architecture and Design
Core Components
- KafkaStreams: Manages consumers and partitions.
- StreamWorker: Processes records per partition.
- ParseableSinkProcessor: Transforms messages and sinks them to Parseable.
- RebalanceListener: Handles partition rebalancing.
- Metrics Collector: Provides Prometheus metrics.
Security Layer: Configurable authentication.
Error Handling
Error Type | Handling Strategy |
---|---|
Connection Errors | Retries every 1 second |
Fatal Errors | Stops the pipeline |
Auth Errors | Stops the pipeline |
Best Practices
Performance Tuning:
- More partitions = Better parallelism
- Use RoundRobin partition assignment
- Increase partition count for high lag
Security:
- Prefer SSL over SASL_PLAINTEXT
- Rotate certificates regularly
- Monitor authentication failures