Skip to content

Extension chaoskafka

Version 0.1.3
Repository https://github.com/friki-io/chaostoolkit-kafka

Version License

Build Python versions

This project contains Chaos Toolkit activities to create kafka chaos experiments.

Install

This package requires Python 3.8+

To be used from your experiment, this package must be installed in the Python environment where chaostoolkit already lives.

$ pip install chaostoolkit-kafka

Usage

A typical experiment using this extension would look like this:

{
  "title": "Kafka Consumer Lag and Corruption Chaos Experiment",
  "description": "This experiment simulates a Kafka chaos scenario by producing a corrupt message and monitoring whether the consumer group experiences lag exceeding a specified threshold.",
  "configuration": {
    "bootstrap_servers": {
      "type": "env",
      "key": "BOOTSTRAP_SERVERS"
    },
    "topic": "poke-orders",
    "group_id": "poke-order-consumer"
  },
  "steady-state-hypothesis": {
    "title": "Check if the consumer group has lag under threshold",
    "probes": [
      {
        "name": "Consumer group has lag under the threshold",
        "type": "probe",
        "tolerance": true,
        "provider": {
          "type": "python",
          "module": "chaoskafka.probes",
          "func": "check_consumer_lag_under_threshold",
          "arguments": {
            "bootstrap_servers": "${bootstrap_servers}",
            "group_id": "${group_id}",
            "topic": "${topic}",
            "threshold": 15,
            "partition": 1
          }
        }
      }
    ]
  },
  "method": [
    {
      "type": "action",
      "name": "Produce corrupt Kafka message",
      "provider": {
        "type": "python",
        "module": "chaoskafka.actions",
        "func": "produce_messages",
        "arguments": {
          "bootstrap_servers": "${bootstrap_servers}",
          "topic": "${topic}",
          "partition": 1,
          "messages": ["corrupted_message"]
        }
      },
      "pauses": {
        "after": 120
      },
      "controls": [
        {
          "name": "calculate offsets and num_messages",
          "provider": {
            "type": "python",
            "module": "chaoskafka.controls.get_production_offsets"
          }
        }
      ]
    }
  ],
  "rollbacks": [
    {
      "type": "action",
      "name": "Manually Consume Unprocessable Kafka Message",
      "provider": {
        "type": "python",
        "module": "chaoskafka.actions",
        "func": "consume_messages",
        "arguments": {
          "bootstrap_servers": "${bootstrap_servers}",
          "topic": "${topic}",
          "group_id": "${group_id}",
          "partition": 1,
          "offset": "${earliest}",
          "num_messages": "${num_messages}"
        }
      }
    }
  ]
}

That’s it!

Please explore the code to see existing probes and actions.

Test

To run the tests for the project execute the following:

$ pdm run test

Formatting and Linting

We use ruff to both lint and format this repositories code.

Before raising a Pull Request, we recommend you run formatting against your code with:

$ pdm run format

This will automatically format any code that doesn’t adhere to the formatting standards.

As some things are not picked up by the formatting, we also recommend you run:

$ pdm run lint

To ensure that any unused import statements/strings that are too long, etc. are also picked up.

Contribute

If you wish to contribute more functions to this package, you are more than welcome to do so. Please, fork this project, make your changes following the usual PEP 8 code style, sprinkling with tests and submit a PR for review.

Exported Activities

actions


consume_messages

Type action
Module chaoskafka.actions
Name consume_messages
Return list

Consume messages from a Kafka topic.

This function consumes a specified number of messages from a Kafka topic, starting from a given offset in a specific partition. It spawns multiple threads to handle the consumers group with diferents clients id that can supplant the same consumer group. for more information you can check https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html

Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. group_id (str): The consumer group ID. topic (str): The name of the Kafka topic to consume messages from. partition (int): The partition within the topic to consume messages from. offset (int): The starting offset to consume messages from. num_messages (int): The number of messages to consume.

Returns: List[dict]: A list of consumed messages.

Raises: FailedActivity: If the topic is None, the partition does not exist, or if there is an issue consuming the messages.

Signature:

def consume_messages(bootstrap_servers: str = None,
                     group_id: str = 'chaostoolkit',
                     topic: str = None,
                     partition: int = 0,
                     offset: int = 0,
                     num_messages: int = 1,
                     configuration: Dict[str, Dict[str, str]] = None,
                     secrets: Dict[str, Dict[str, str]] = None) -> List[dict]:
    pass

Arguments:

Name Type Default Required
bootstrap_servers string null No
group_id string “chaostoolkit” No
topic string null No
partition integer 0 No
offset integer 0 No
num_messages integer 1 No

Usage:

{
  "name": "consume-messages",
  "type": "action",
  "provider": {
    "type": "python",
    "module": "chaoskafka.actions",
    "func": "consume_messages"
  }
}
name: consume-messages
provider:
  func: consume_messages
  module: chaoskafka.actions
  type: python
type: action

delete_consumer_group

Type action
Module chaoskafka.actions
Name delete_consumer_group
Return boolean

Delete a Kafka consumer group.

This function deletes a specified Kafka consumer group from the cluster. It uses the Kafka AdminClient to issue the delete operation.

Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. group_id (str): The ID of the consumer group to delete.

Returns: bool: True if the consumer group was successfully deleted, False otherwise.

Raises: FailedActivity: If there is an issue deleting the consumer group.

Note: To delete a consumer group, make sure it has no members.

Signature:

def delete_consumer_group(bootstrap_servers: str = None,
                          group_id: str = None,
                          configuration: Dict[str, Dict[str, str]] = None,
                          secrets: Dict[str, Dict[str, str]] = None) -> bool:
    pass

Arguments:

Name Type Default Required
bootstrap_servers string null No
group_id string null No

Usage:

{
  "name": "delete-consumer-group",
  "type": "action",
  "provider": {
    "type": "python",
    "module": "chaoskafka.actions",
    "func": "delete_consumer_group"
  }
}
name: delete-consumer-group
provider:
  func: delete_consumer_group
  module: chaoskafka.actions
  type: python
type: action

delete_kafka_topic

Type action
Module chaoskafka.actions
Name delete_kafka_topic
Return boolean

Delete a Kafka topic.

This function deletes a specified Kafka topic from the cluster. It uses the Kafka AdminClient to issue the delete operation.

Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. topic (str): The name of the Kafka topic to delete.

Returns: bool: True if the topic was successfully deleted, False otherwise.

Raises: FailedActivity: If there is an issue deleting the topic.

Signature:

def delete_kafka_topic(bootstrap_servers: str = None,
                       topic: str = None,
                       configuration: Dict[str, Dict[str, str]] = None,
                       secrets: Dict[str, Dict[str, str]] = None) -> bool:
    pass

Arguments:

Name Type Default Required
bootstrap_servers string null No
topic string null No

Usage:

{
  "name": "delete-kafka-topic",
  "type": "action",
  "provider": {
    "type": "python",
    "module": "chaoskafka.actions",
    "func": "delete_kafka_topic"
  }
}
name: delete-kafka-topic
provider:
  func: delete_kafka_topic
  module: chaoskafka.actions
  type: python
type: action

produce_messages

Type action
Module chaoskafka.actions
Name produce_messages
Return list

Produce messages to a Kafka topic.

This function sends a list of messages to a specified Kafka topic using the Kafka Producer. Each message is sent to a specified partition within the topic.

Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. client_id (str): The client ID for the producer. topic (str): The name of the Kafka topic to produce messages to. messages (List[str]): A list of messages to be sent to the topic. partition (int): The partition within the topic to send messages to.

Returns: List[Dict]: A list of dictionaries containing the result of each message produced, with either an “error” key or a “message” key.

Raises: FailedActivity: If the topic is None or if there is an issue producing the messages.

Signature:

def produce_messages(bootstrap_servers: str = None,
                     client_id: str = 'chaostoolkit',
                     topic: str = None,
                     messages: List[str] = [],
                     partition: int = 0,
                     configuration: Dict[str, Dict[str, str]] = None,
                     secrets: Dict[str, Dict[str, str]] = None) -> List[dict]:
    pass

Arguments:

Name Type Default Required
bootstrap_servers string null No
client_id string “chaostoolkit” No
topic string null No
messages list [] No
partition integer 0 No

Usage:

{
  "name": "produce-messages",
  "type": "action",
  "provider": {
    "type": "python",
    "module": "chaoskafka.actions",
    "func": "produce_messages"
  }
}
name: produce-messages
provider:
  func: produce_messages
  module: chaoskafka.actions
  type: python
type: action

rebalance_consumer_group

Type action
Module chaoskafka.actions
Name rebalance_consumer_group
Return boolean

Rebalance a Kafka consumer group.

This function forces a rebalance of a specified Kafka consumer group by subscribing to a given topic, waiting for a short period, and then closing the consumer. This action triggers the consumer group to rebalance.

Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. topic (str): The name of the Kafka topic to subscribe to. group_id (str): The consumer group ID.

Returns: bool: True if the rebalance was successfully triggered, False otherwise.

Raises: FailedActivity: If the topic is None or if there is an issue rebalancing the consumer group.

Signature:

def rebalance_consumer_group(
        bootstrap_servers: str = None,
        topic: str = None,
        group_id: str = None,
        configuration: Dict[str, Dict[str, str]] = None,
        secrets: Dict[str, Dict[str, str]] = None) -> bool:
    pass

Arguments:

Name Type Default Required
bootstrap_servers string null No
topic string null No
group_id string null No

Usage:

{
  "name": "rebalance-consumer-group",
  "type": "action",
  "provider": {
    "type": "python",
    "module": "chaoskafka.actions",
    "func": "rebalance_consumer_group"
  }
}
name: rebalance-consumer-group
provider:
  func: rebalance_consumer_group
  module: chaoskafka.actions
  type: python
type: action

probes


all_replicas_in_sync

Type probe
Module chaoskafka.probes
Name all_replicas_in_sync
Return boolean

Check if all replicas for each partition of a Kafka topic are in sync with the leader.

This function verifies the health of a specified Kafka topic by ensuring that all replicas for each partition are in sync with the leader. If all replicas are in sync, the topic is considered healthy.

Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. topic (str): The name of the Kafka topic to check.

Returns: bool: True if all replicas for each partition are in sync with the leader, False otherwise.

Raises: FailedActivity: If some kafka exception was raised.

Signature:

def all_replicas_in_sync(bootstrap_servers: str = None,
                         topic: str = None,
                         configuration: Dict[str, Dict[str, str]] = None,
                         secrets: Dict[str, Dict[str, str]] = None) -> bool:
    pass

Arguments:

Name Type Default Required
bootstrap_servers string null No
topic string null No

Usage:

{
  "name": "all-replicas-in-sync",
  "type": "probe",
  "provider": {
    "type": "python",
    "module": "chaoskafka.probes",
    "func": "all_replicas_in_sync"
  }
}
name: all-replicas-in-sync
provider:
  func: all_replicas_in_sync
  module: chaoskafka.probes
  type: python
type: probe

check_consumer_lag_under_threshold

Type probe
Module chaoskafka.probes
Name check_consumer_lag_under_threshold
Return boolean

Check if the consumer lag is under a certain threshold for a specific partition or all partitions.

This function checks the consumer lag for a specified Kafka topic and consumer group, and verifies if the lag is under a defined threshold. It can check the lag for either a specific partition or all partitions.

Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. group_id (str): The consumer group ID. topic (str): The name of the Kafka topic. threshold (int): The maximum allowable lag threshold. partition (Optional[int]): The specific partition to check. If None, checks all partitions.

Returns: bool: True if the consumer lag is under the threshold for the specified partition or all partitions, False otherwise.

Raises: FailedActivity: If there is an issue accessing the cluster metadata.

Signature:

def check_consumer_lag_under_threshold(
        bootstrap_servers: str = None,
        group_id: str = None,
        topic: str = None,
        threshold: int = 0,
        partition: Optional[int] = None,
        configuration: Dict[str, Dict[str, str]] = None,
        secrets: Dict[str, Dict[str, str]] = None) -> bool:
    pass

Arguments:

Name Type Default Required
bootstrap_servers string null No
group_id string null No
topic string null No
threshold integer 0 No
partition object null No

Usage:

{
  "name": "check-consumer-lag-under-threshold",
  "type": "probe",
  "provider": {
    "type": "python",
    "module": "chaoskafka.probes",
    "func": "check_consumer_lag_under_threshold"
  }
}
name: check-consumer-lag-under-threshold
provider:
  func: check_consumer_lag_under_threshold
  module: chaoskafka.probes
  type: python
type: probe

cluster_doesnt_have_under_replicated_partitions

Type probe
Module chaoskafka.probes
Name cluster_doesnt_have_under_replicated_partitions
Return boolean

Check if the Kafka cluster has under-replicated partitions.

This function verifies the health of the entire Kafka cluster by checking if any partition is under-replicated. A partition is considered under-replicated if the number of in-sync replicas (ISRs) is less than the total number of replicas.

Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers.

Returns: bool: True if no partition in the cluster is under-replicated, False otherwise.

Raises: KafkaException: If there is an underlying Kafka-related error.

Signature:

def cluster_doesnt_have_under_replicated_partitions(
        bootstrap_servers: str = None,
        configuration: Dict[str, Dict[str, str]] = None,
        secrets: Dict[str, Dict[str, str]] = None) -> bool:
    pass

Arguments:

Name Type Default Required
bootstrap_servers string null No

Usage:

{
  "name": "cluster-doesnt-have-under-replicated-partitions",
  "type": "probe",
  "provider": {
    "type": "python",
    "module": "chaoskafka.probes",
    "func": "cluster_doesnt_have_under_replicated_partitions"
  }
}
name: cluster-doesnt-have-under-replicated-partitions
provider:
  func: cluster_doesnt_have_under_replicated_partitions
  module: chaoskafka.probes
  type: python
type: probe

describe_kafka_topic

Type probe
Module chaoskafka.probes
Name describe_kafka_topic
Return mapping

Describe a Kafka topic and its partitions.

This function retrieves metadata for a specified Kafka topic, including the number of partitions and detailed information about each partition. It first checks if the topic exists in the Kafka cluster.

Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. topic (str): The name of the Kafka topic to describe.

Returns: Dict: A dictionary containing detailed information about the topic and its partitions.

Raises: FailedActivity: If some kafka exception was raised.

Signature:

def describe_kafka_topic(bootstrap_servers: str = None,
                         topic: str = None,
                         configuration: Dict[str, Dict[str, str]] = None,
                         secrets: Dict[str, Dict[str, str]] = None) -> Dict:
    pass

Arguments:

Name Type Default Required
bootstrap_servers string null No
topic string null No

Usage:

{
  "name": "describe-kafka-topic",
  "type": "probe",
  "provider": {
    "type": "python",
    "module": "chaoskafka.probes",
    "func": "describe_kafka_topic"
  }
}
name: describe-kafka-topic
provider:
  func: describe_kafka_topic
  module: chaoskafka.probes
  type: python
type: probe

topic_has_no_offline_partitions

Type probe
Module chaoskafka.probes
Name topic_has_no_offline_partitions
Return boolean

Check if a Kafka topic has no offline partitions.

This function verifies the health of a specified Kafka topic by checking if all its partitions have an active leader and at least one in-sync replica (ISR). If some partition is offline,the topic is not considered healthy.

Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. topic (str): The name of the Kafka topic to check.

Returns: bool: True if all partitions are online and don’t have offline replicas, False otherwise.

Raises: FailedActivity: If some kafka exception was raised.

Signature:

def topic_has_no_offline_partitions(
        bootstrap_servers: str = None,
        topic: str = None,
        configuration: Dict[str, Dict[str, str]] = None,
        secrets: Dict[str, Dict[str, str]] = None) -> bool:
    pass

Arguments:

Name Type Default Required
bootstrap_servers string null No
topic string null No

Usage:

{
  "name": "topic-has-no-offline-partitions",
  "type": "probe",
  "provider": {
    "type": "python",
    "module": "chaoskafka.probes",
    "func": "topic_has_no_offline_partitions"
  }
}
name: topic-has-no-offline-partitions
provider:
  func: topic_has_no_offline_partitions
  module: chaoskafka.probes
  type: python
type: probe