Source code for kafka.common

import inspect
import sys
from collections import namedtuple

###############
#   Structs   #
###############

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
MetadataRequest = namedtuple("MetadataRequest",
    ["topics"])

MetadataResponse = namedtuple("MetadataResponse",
    ["brokers", "topics"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest
ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest",
    ["groups"])

ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
    ["error", "nodeId", "host", "port"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
ProduceRequest = namedtuple("ProduceRequest",
    ["topic", "partition", "messages"])

ProduceResponse = namedtuple("ProduceResponse",
    ["topic", "partition", "error", "offset"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI
FetchRequest = namedtuple("FetchRequest",
    ["topic", "partition", "offset", "max_bytes"])

FetchResponse = namedtuple("FetchResponse",
    ["topic", "partition", "error", "highwaterMark", "messages"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
OffsetRequest = namedtuple("OffsetRequest",
    ["topic", "partition", "time", "max_offsets"])

OffsetResponse = namedtuple("OffsetResponse",
    ["topic", "partition", "error", "offsets"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
OffsetCommitRequest = namedtuple("OffsetCommitRequest",
    ["topic", "partition", "offset", "metadata"])

OffsetCommitResponse = namedtuple("OffsetCommitResponse",
    ["topic", "partition", "error"])

OffsetFetchRequest = namedtuple("OffsetFetchRequest",
    ["topic", "partition"])

OffsetFetchResponse = namedtuple("OffsetFetchResponse",
    ["topic", "partition", "offset", "metadata", "error"])



# Other useful structs
BrokerMetadata = namedtuple("BrokerMetadata",
    ["nodeId", "host", "port"])

TopicMetadata = namedtuple("TopicMetadata",
    ["topic", "error", "partitions"])

PartitionMetadata = namedtuple("PartitionMetadata",
    ["topic", "partition", "leader", "replicas", "isr", "error"])

OffsetAndMessage = namedtuple("OffsetAndMessage",
    ["offset", "message"])

Message = namedtuple("Message",
    ["magic", "attributes", "key", "value"])

TopicAndPartition = namedtuple("TopicAndPartition",
    ["topic", "partition"])

KafkaMessage = namedtuple("KafkaMessage",
    ["topic", "partition", "offset", "key", "value"])

# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
    ["limit", "backoff_ms", "retry_on_timeouts"])


#################
#   Exceptions  #
#################


[docs]class KafkaError(RuntimeError): pass
[docs]class BrokerResponseError(KafkaError): pass
[docs]class UnknownError(BrokerResponseError): errno = -1 message = 'UNKNOWN'
[docs]class OffsetOutOfRangeError(BrokerResponseError): errno = 1 message = 'OFFSET_OUT_OF_RANGE'
[docs]class InvalidMessageError(BrokerResponseError): errno = 2 message = 'INVALID_MESSAGE'
[docs]class UnknownTopicOrPartitionError(BrokerResponseError): errno = 3 message = 'UNKNOWN_TOPIC_OR_PARTITON'
[docs]class InvalidFetchRequestError(BrokerResponseError): errno = 4 message = 'INVALID_FETCH_SIZE'
[docs]class LeaderNotAvailableError(BrokerResponseError): errno = 5 message = 'LEADER_NOT_AVAILABLE'
[docs]class NotLeaderForPartitionError(BrokerResponseError): errno = 6 message = 'NOT_LEADER_FOR_PARTITION'
[docs]class RequestTimedOutError(BrokerResponseError): errno = 7 message = 'REQUEST_TIMED_OUT'
[docs]class BrokerNotAvailableError(BrokerResponseError): errno = 8 message = 'BROKER_NOT_AVAILABLE'
[docs]class ReplicaNotAvailableError(BrokerResponseError): errno = 9 message = 'REPLICA_NOT_AVAILABLE'
[docs]class MessageSizeTooLargeError(BrokerResponseError): errno = 10 message = 'MESSAGE_SIZE_TOO_LARGE'
[docs]class StaleControllerEpochError(BrokerResponseError): errno = 11 message = 'STALE_CONTROLLER_EPOCH'
[docs]class OffsetMetadataTooLargeError(BrokerResponseError): errno = 12 message = 'OFFSET_METADATA_TOO_LARGE'
[docs]class StaleLeaderEpochCodeError(BrokerResponseError): errno = 13 message = 'STALE_LEADER_EPOCH_CODE'
[docs]class OffsetsLoadInProgressCode(BrokerResponseError): errno = 14 message = 'OFFSETS_LOAD_IN_PROGRESS_CODE'
[docs]class ConsumerCoordinatorNotAvailableCode(BrokerResponseError): errno = 15 message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE_CODE'
[docs]class NotCoordinatorForConsumerCode(BrokerResponseError): errno = 16 message = 'NOT_COORDINATOR_FOR_CONSUMER_CODE'
[docs]class KafkaUnavailableError(KafkaError): pass
[docs]class KafkaTimeoutError(KafkaError): pass
[docs]class FailedPayloadsError(KafkaError): def __init__(self, payload, *args): super(FailedPayloadsError, self).__init__(*args) self.payload = payload
[docs]class ConnectionError(KafkaError): pass
[docs]class BufferUnderflowError(KafkaError): pass
[docs]class ChecksumError(KafkaError): pass
[docs]class ConsumerFetchSizeTooSmall(KafkaError): pass
[docs]class ConsumerNoMoreData(KafkaError): pass
[docs]class ConsumerTimeout(KafkaError): pass
[docs]class ProtocolError(KafkaError): pass
[docs]class UnsupportedCodecError(KafkaError): pass
[docs]class KafkaConfigurationError(KafkaError): pass
[docs]class AsyncProducerQueueFull(KafkaError): def __init__(self, failed_msgs, *args): super(AsyncProducerQueueFull, self).__init__(*args) self.failed_msgs = failed_msgs
def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: yield obj kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()])
[docs]def check_error(response): if isinstance(response, Exception): raise response if response.error: error_class = kafka_errors.get(response.error, UnknownError) raise error_class(response)
RETRY_BACKOFF_ERROR_TYPES = ( KafkaUnavailableError, LeaderNotAvailableError, ConnectionError, FailedPayloadsError ) RETRY_REFRESH_ERROR_TYPES = ( NotLeaderForPartitionError, UnknownTopicOrPartitionError, LeaderNotAvailableError, ConnectionError ) RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES