Skip to main content

Kafka

Overview

The Parseable Kafka Connector enables log ingestion from Apache Kafka into Parseable, providing a high-performance, scalable, and efficient logging pipeline.

Features

  • Consumer & Producer Support: Supports both consuming and producing messages (ready to use for DLT).
  • Configurable Buffering & Performance Settings: Optimized for high-throughput data processing.
  • Security Integration: Supports SSL/TLS and SASL authentication.
  • Fault Tolerance & Partitioning: Handles partition balancing, offsets, and error handling.

Configuration Options

General Kafka Configuration

ParameterEnvironment VariableDefault ValueDescriptionUsage
--bootstrap-serversP_KAFKA_BOOTSTRAP_SERVERSlocalhost:9092Comma-separated list of Kafka bootstrap servers.Specifies the Kafka brokers the client should connect to.
--client-idP_KAFKA_CLIENT_IDparseable-connectClient ID for Kafka connection.Identifies the client instance in Kafka logs.
--partition-listener-concurrencyP_KAFKA_PARTITION_LISTENER_CONCURRENCY2Number of parallel threads for Kafka partition listeners.Determines the number of threads used to process Kafka partitions.
--bad-data-policyP_CONNECTOR_BAD_DATA_POLICYfailPolicy for handling bad data.Determines how the client should handle corrupt or invalid messages. Options: fail, drop (not yet supported), dlt (not yet supported).
  • All parameters can be set using command-line arguments or environment variables.
  • Environment variables take precedence over default values.
  • When configuring both producer and consumer, make sure to specify relevant options in their respective sections.

For more details, refer to Kafka's official documentation.

Consumer Configuration

ParameterEnvironment VariableDefault ValueDescriptionUsage
--consumer-topicsP_KAFKA_CONSUMER_TOPICSNoneComma-separated list of topics to consume from.Specify the Kafka topics the consumer should subscribe to.
--consumer-group-idP_KAFKA_CONSUMER_GROUP_IDparseable-connect-cgThe consumer group ID.Used to group consumers for load balancing and fault tolerance.
--buffer-sizeP_KAFKA_CONSUMER_BUFFER_SIZE10000Size of the buffer for batching records per partition.Controls the number of messages buffered before processing.
--buffer-timeoutP_KAFKA_CONSUMER_BUFFER_TIMEOUT10000msTimeout for buffer flush in milliseconds.Defines the time to wait before flushing buffered messages.
--consumer-group-instance-idP_KAFKA_CONSUMER_GROUP_INSTANCE_IDparseable-connect-cg-ii-{random}Group instance ID for static membership.Useful for maintaining static assignments in consumer groups.
--consumer-partition-strategyP_KAFKA_CONSUMER_PARTITION_STRATEGYroundrobin,rangePartition assignment strategy.Determines how partitions are assigned among consumers.
--consumer-session-timeoutP_KAFKA_CONSUMER_SESSION_TIMEOUT60000Session timeout in milliseconds.Time before a consumer is considered inactive.
--consumer-heartbeat-intervalP_KAFKA_CONSUMER_HEARTBEAT_INTERVAL3000Heartbeat interval in milliseconds.Frequency at which consumers send heartbeats.
--consumer-max-poll-intervalP_KAFKA_CONSUMER_MAX_POLL_INTERVAL300000Maximum poll interval in milliseconds.Maximum time between poll calls before the consumer is considered dead.
--consumer-enable-auto-offset-storeP_KAFKA_CONSUMER_ENABLE_AUTO_OFFSET_STOREtrueEnable auto offset store.Determines whether offsets are automatically stored after processing messages.
--consumer-auto-offset-resetP_KAFKA_CONSUMER_AUTO_OFFSET_RESETearliestAuto offset reset behavior.Determines whether to start from the beginning (earliest) or latest (latest) offset.
--consumer-fetch-min-bytesP_KAFKA_CONSUMER_FETCH_MIN_BYTES1Minimum bytes to fetch.The smallest amount of data the broker should send.
--consumer-fetch-max-bytesP_KAFKA_CONSUMER_FETCH_MAX_BYTES52428800Maximum bytes to fetch.The maximum amount of data fetched in a single request.
--consumer-fetch-max-waitP_KAFKA_CONSUMER_FETCH_MAX_WAIT500Maximum wait time for fetch in milliseconds.Maximum time the broker should wait before sending data.
--consumer-max-partition-fetch-bytesP_KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES1048576Maximum bytes to fetch per partition.Limits the maximum data fetched per partition.
--consumer-queued-min-messagesP_KAFKA_CONSUMER_QUEUED_MIN_MESSAGES100000Minimum messages to queue.Controls the minimum number of messages buffered in the consumer.
--consumer-queued-max-messages-kbytesP_KAFKA_CONSUMER_QUEUED_MAX_MESSAGES_KBYTES65536Maximum message queue size in KBytes.Determines the maximum queue size in kilobytes.
--consumer-enable-partition-eofP_KAFKA_CONSUMER_ENABLE_PARTITION_EOFfalseEnable partition EOF.Signals when the end of a partition is reached.
--consumer-check-crcsP_KAFKA_CONSUMER_CHECK_CRCSfalseCheck CRCs on messages.Ensures message integrity by verifying CRCs.
--consumer-isolation-levelP_KAFKA_CONSUMER_ISOLATION_LEVELread_committedTransaction isolation level.Controls whether uncommitted transactions should be visible to the consumer.
--consumer-fetch-message-max-bytesP_KAFKA_CONSUMER_FETCH_MESSAGE_MAX_BYTES1048576Maximum bytes per message.Defines the largest individual message the consumer can fetch.
--consumer-stats-intervalP_KAFKA_CONSUMER_STATS_INTERVAL10000Statistics interval in milliseconds.Defines the frequency at which consumer statistics are collected.
  • All parameters can be set using command-line arguments or environment variables.
  • Environment variables take precedence over default values.
  • Some values, such as group_instance_id, are dynamically generated if not explicitly provided.

For more details, refer to Kafka's official documentation on consumer configurations.

Producer Configuration

info

Producer configuration is not necessary at this moment. When DLT is implemented, these settings will be required.

ParameterEnvironment VariableDefault ValueDescriptionUsage
--producer-acksP_KAFKA_PRODUCER_ACKSallNumber of acknowledgments the producer requires.Determines when a message is considered successfully sent (0, 1, all).
--producer-compression-typeP_KAFKA_PRODUCER_COMPRESSION_TYPElz4Compression type for messages.Determines how messages are compressed (none, gzip, snappy, lz4, zstd).
--producer-batch-sizeP_KAFKA_PRODUCER_BATCH_SIZE16384Maximum size of a request in bytes.Defines the size of batches sent to Kafka.
--producer-linger-msP_KAFKA_PRODUCER_LINGER_MS5Delay to wait for more messages in the same batch.Controls latency vs. throughput trade-off.
--producer-message-timeout-msP_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS120000Local message timeout.Time before an unacknowledged message is dropped.
--producer-max-inflightP_KAFKA_PRODUCER_MAX_INFLIGHT5Maximum number of in-flight requests per connection.Controls how many messages can be sent without acknowledgment.
--producer-message-max-bytesP_KAFKA_PRODUCER_MESSAGE_MAX_BYTES1048576Maximum size of a message in bytes.Restricts the maximum message size that can be sent.
--producer-enable-idempotenceP_KAFKA_PRODUCER_ENABLE_IDEMPOTENCEtrueEnable idempotent producer.Ensures exactly-once delivery guarantees.
--producer-transaction-timeout-msP_KAFKA_PRODUCER_TRANSACTION_TIMEOUT_MS60000Transaction timeout.Maximum time for a transaction before it times out.
--producer-buffer-memoryP_KAFKA_PRODUCER_BUFFER_MEMORY33554432Total bytes of memory the producer can use.Limits the memory available for buffering messages.
--producer-retry-backoff-msP_KAFKA_PRODUCER_RETRY_BACKOFF_MS100Time to wait before retrying a failed request.Defines back-off time for retries.
--producer-request-timeout-msP_KAFKA_PRODUCER_REQUEST_TIMEOUT_MS30000Time to wait for a response from brokers.Limits how long the producer waits for broker acknowledgment.
--producer-queue-buffering-max-messagesP_KAFKA_PRODUCER_QUEUE_BUFFERING_MAX_MESSAGES100000Maximum number of messages allowed on the producer queue.Prevents excessive message buffering.
--producer-queue-buffering-max-kbytesP_KAFKA_PRODUCER_QUEUE_BUFFERING_MAX_KBYTES1048576Maximum total message size sum allowed on the producer queue.Restricts the producer queue's total size.
--producer-delivery-timeout-msP_KAFKA_PRODUCER_DELIVERY_TIMEOUT_MS120000Maximum time to report success or failure after send.Defines the upper bound on message delivery time.
--producer-max-retriesP_KAFKA_PRODUCER_MAX_RETRIES2147483647Maximum number of retries per message.Controls how many times a message is retried before failing.
--producer-retry-backoff-max-msP_KAFKA_PRODUCER_RETRY_BACKOFF_MAX_MS1000Maximum back-off time between retries.Ensures retries are not too frequent.
  • All parameters can be set using command-line arguments or environment variables.
  • Environment variables take precedence over default values.
  • Certain parameters, such as --producer-acks and --producer-enable-idempotence, affect message durability and reliability.

For more details, refer to Kafka's official documentation on producer configurations.

Security Configuration

ParameterEnvironment VariableDefault ValueDescriptionUsage
--security-protocolP_KAFKA_SECURITY_PROTOCOLPLAINTEXTSecurity protocol used for communication.Determines whether SSL, SASL, or plaintext is used.
--ssl-ca-locationP_KAFKA_SSL_CA_LOCATIONNoneCA certificate file path.Required when using SSL or SASL_SSL.
--ssl-certificate-locationP_KAFKA_SSL_CERTIFICATE_LOCATIONNoneClient certificate file path.Required when using SSL or SASL_SSL.
--ssl-key-locationP_KAFKA_SSL_KEY_LOCATIONNoneClient key file path.Required when using SSL or SASL_SSL.
--ssl-key-passwordP_KAFKA_SSL_KEY_PASSWORDNoneSSL key password.Used if the SSL key is password protected.
--sasl-mechanismP_KAFKA_SASL_MECHANISMNoneSASL authentication mechanism.Required when using SASL_SSL or SASL_PLAINTEXT (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER).
--sasl-usernameP_KAFKA_SASL_USERNAMENoneSASL username.Required for PLAIN or SCRAM SASL mechanisms.
--sasl-passwordP_KAFKA_SASL_PASSWORDNoneSASL password.Required for PLAIN or SCRAM SASL mechanisms.
--kerberos-service-nameP_KAFKA_KERBEROS_SERVICE_NAMENoneKerberos service name.Required when using GSSAPI SASL mechanism.
--kerberos-principalP_KAFKA_KERBEROS_PRINCIPALNoneKerberos principal.Used for Kerberos authentication.
--kerberos-keytabP_KAFKA_KERBEROS_KEYTABNonePath to Kerberos keytab file.Required when using Kerberos authentication.
--oauth-token-endpointP_KAFKA_OAUTH_TOKEN_ENDPOINTNoneOAuth Bearer token endpoint.Required when using OAUTHBEARER SASL mechanism.
--oauth-client-idP_KAFKA_OAUTH_CLIENT_IDNoneOAuth client ID.Used for authentication with an OAuth provider.
--oauth-client-secretP_KAFKA_OAUTH_CLIENT_SECRETNoneOAuth client secret.Used to authenticate the OAuth client.
--oauth-scopeP_KAFKA_OAUTH_SCOPENoneOAuth scope.Defines the permissions requested from the OAuth provider.

Security Configuration Combinations

Plaintext Communication (No Security)

  • --security-protocol=PLAINTEXT
  • No additional parameters required.

SSL Encryption

  • --security-protocol=SSL

Required parameters:

  • --ssl-ca-location
  • --ssl-certificate-location
  • --ssl-key-location
  • --ssl-key-password (if the key is password-protected)

SASL Authentication with SSL

  • --security-protocol=SASL_SSL

Required parameters:

  • --sasl-mechanism
  • --sasl-username and --sasl-password (for PLAIN or SCRAM mechanisms)
  • --kerberos-service-name and --kerberos-principal (for GSSAPI mechanism)
  • --kerberos-keytab (if using Kerberos authentication)

SSL parameters (if required by the security policy)

SASL Authentication without SSL

  • --security-protocol=SASL_PLAINTEXT

Required parameters:

  • --sasl-mechanism
  • --sasl-username and --sasl-password (for PLAIN or SCRAM mechanisms)
  • --kerberos-service-name and --kerberos-principal (for GSSAPI mechanism)
  • --kerberos-keytab (if using Kerberos authentication)

OAuth Bearer Token Authentication (Not supported yet)

  • --security-protocol=SASL_SSL or SASL_PLAINTEXT
  • --sasl-mechanism=OAUTHBEARER

Required parameters:

  • --oauth-token-endpoint
  • --oauth-client-id
  • --oauth-client-secret
  • --oauth-scope (if required by the OAuth provider)

Examples

SSL Configuration
export P_KAFKA_SECURITY_PROTOCOL="SSL"
export P_KAFKA_SSL_CA_LOCATION="/path/to/ca.pem"
export P_KAFKA_SSL_CERTIFICATE_LOCATION="/path/to/client-cert.pem"
export P_KAFKA_SSL_KEY_LOCATION="/path/to/client-key.pem"
export P_KAFKA_SSL_KEY_PASSWORD="my-secure-password"
SASL Configuration
export P_KAFKA_SECURITY_PROTOCOL="SASL_SSL"
export P_KAFKA_SASL_MECHANISM="SCRAM-SHA-512"
export P_KAFKA_SASL_USERNAME="my-user"
export P_KAFKA_SASL_PASSWORD="my-password"

For more details, refer to Kafka's official security documentation.

Concurrency and Multi-Instance Processing

The connector uses a dedicated thread per partition with configurable concurrency:

export P_KAFKA_PARTITION_LISTENER_CONCURRENCY="2"

Thread Assignment Formula:

Threads per ingestor = min(Partitions per ingestor, Configured threads)
Example Scenarios:

Balanced Configuration:

  • 6 partitions
  • 2 ingest nodes
  • 3 threads per node

Result: Each thread handles one partition

Over-threaded Configuration:

  • 4 partitions
  • 2 ingest nodes
  • 4 threads per node

Result: 2 threads per node will be idle

Consumer Group Rebalancing

Rebalance triggers:

  • New consumer joins
  • Existing consumer leaves
  • Network issues
  • Partition reassignment

Available strategies:

  • Range: Assigns consecutive partitions
  • RoundRobin: Distributes evenly
  • Sticky: Minimizes reassignments (Not recommended for Parseable since rdKafka issues)
  • CooperativeSticky: Controlled rebalance (Not recommended for Parseable since rdKafka support is lacking)

Recommended configuration(by default):

export P_KAFKA_CONSUMER_PARTITION_STRATEGY="roundrobin,range"
export P_KAFKA_CONSUMER_SESSION_TIMEOUT="60000"
export P_KAFKA_CONSUMER_HEARTBEAT_INTERVAL="3000"

Metrics

All metrics listed in librdkafka's statistics documentation are available at the /metrics endpoint.

Architecture and Design

Core Components
  • KafkaStreams: Manages consumers and partitions.
  • StreamWorker: Processes records per partition.
  • ParseableSinkProcessor: Transforms messages and sinks them to Parseable.
  • RebalanceListener: Handles partition rebalancing.
  • Metrics Collector: Provides Prometheus metrics.

Security Layer: Configurable authentication.

Error Handling
Error TypeHandling Strategy
Connection ErrorsRetries every 1 second
Fatal ErrorsStops the pipeline
Auth ErrorsStops the pipeline

Best Practices

Performance Tuning:

  • More partitions = Better parallelism
  • Use RoundRobin partition assignment
  • Increase partition count for high lag

Security:

  • Prefer SSL over SASL_PLAINTEXT
  • Rotate certificates regularly
  • Monitor authentication failures