Extension chaoskafka
¶
Version | 0.1.3 |
Repository | https://github.com/friki-io/chaostoolkit-kafka |
This project contains Chaos Toolkit activities to create kafka chaos experiments.
Install¶
This package requires Python 3.8+
To be used from your experiment, this package must be installed in the Python environment where chaostoolkit already lives.
$ pip install chaostoolkit-kafka
Usage¶
A typical experiment using this extension would look like this:
{
"title": "Kafka Consumer Lag and Corruption Chaos Experiment",
"description": "This experiment simulates a Kafka chaos scenario by producing a corrupt message and monitoring whether the consumer group experiences lag exceeding a specified threshold.",
"configuration": {
"bootstrap_servers": {
"type": "env",
"key": "BOOTSTRAP_SERVERS"
},
"topic": "poke-orders",
"group_id": "poke-order-consumer"
},
"steady-state-hypothesis": {
"title": "Check if the consumer group has lag under threshold",
"probes": [
{
"name": "Consumer group has lag under the threshold",
"type": "probe",
"tolerance": true,
"provider": {
"type": "python",
"module": "chaoskafka.probes",
"func": "check_consumer_lag_under_threshold",
"arguments": {
"bootstrap_servers": "${bootstrap_servers}",
"group_id": "${group_id}",
"topic": "${topic}",
"threshold": 15,
"partition": 1
}
}
}
]
},
"method": [
{
"type": "action",
"name": "Produce corrupt Kafka message",
"provider": {
"type": "python",
"module": "chaoskafka.actions",
"func": "produce_messages",
"arguments": {
"bootstrap_servers": "${bootstrap_servers}",
"topic": "${topic}",
"partition": 1,
"messages": ["corrupted_message"]
}
},
"pauses": {
"after": 120
},
"controls": [
{
"name": "calculate offsets and num_messages",
"provider": {
"type": "python",
"module": "chaoskafka.controls.get_production_offsets"
}
}
]
}
],
"rollbacks": [
{
"type": "action",
"name": "Manually Consume Unprocessable Kafka Message",
"provider": {
"type": "python",
"module": "chaoskafka.actions",
"func": "consume_messages",
"arguments": {
"bootstrap_servers": "${bootstrap_servers}",
"topic": "${topic}",
"group_id": "${group_id}",
"partition": 1,
"offset": "${earliest}",
"num_messages": "${num_messages}"
}
}
}
]
}
That’s it!
Please explore the code to see existing probes and actions.
Test¶
To run the tests for the project execute the following:
$ pdm run test
Formatting and Linting¶
We use ruff
to both lint and format this repositories code.
Before raising a Pull Request, we recommend you run formatting against your code with:
$ pdm run format
This will automatically format any code that doesn’t adhere to the formatting standards.
As some things are not picked up by the formatting, we also recommend you run:
$ pdm run lint
To ensure that any unused import statements/strings that are too long, etc. are also picked up.
Contribute¶
If you wish to contribute more functions to this package, you are more than welcome to do so. Please, fork this project, make your changes following the usual PEP 8 code style, sprinkling with tests and submit a PR for review.
Exported Activities¶
actions¶
consume_messages
¶
Type | action |
Module | chaoskafka.actions |
Name | consume_messages |
Return | list |
Consume messages from a Kafka topic.
This function consumes a specified number of messages from a Kafka topic, starting from a given offset in a specific partition. It spawns multiple threads to handle the consumers group with diferents clients id that can supplant the same consumer group. for more information you can check https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html
Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. group_id (str): The consumer group ID. topic (str): The name of the Kafka topic to consume messages from. partition (int): The partition within the topic to consume messages from. offset (int): The starting offset to consume messages from. num_messages (int): The number of messages to consume.
Returns: List[dict]: A list of consumed messages.
Raises: FailedActivity: If the topic is None, the partition does not exist, or if there is an issue consuming the messages.
Signature:
def consume_messages(bootstrap_servers: str = None,
group_id: str = 'chaostoolkit',
topic: str = None,
partition: int = 0,
offset: int = 0,
num_messages: int = 1,
configuration: Dict[str, Dict[str, str]] = None,
secrets: Dict[str, Dict[str, str]] = None) -> List[dict]:
pass
Arguments:
Name | Type | Default | Required |
---|---|---|---|
bootstrap_servers | string | null | No |
group_id | string | “chaostoolkit” | No |
topic | string | null | No |
partition | integer | 0 | No |
offset | integer | 0 | No |
num_messages | integer | 1 | No |
Usage:
{
"name": "consume-messages",
"type": "action",
"provider": {
"type": "python",
"module": "chaoskafka.actions",
"func": "consume_messages"
}
}
name: consume-messages
provider:
func: consume_messages
module: chaoskafka.actions
type: python
type: action
delete_consumer_group
¶
Type | action |
Module | chaoskafka.actions |
Name | delete_consumer_group |
Return | boolean |
Delete a Kafka consumer group.
This function deletes a specified Kafka consumer group from the cluster. It uses the Kafka AdminClient to issue the delete operation.
Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. group_id (str): The ID of the consumer group to delete.
Returns: bool: True if the consumer group was successfully deleted, False otherwise.
Raises: FailedActivity: If there is an issue deleting the consumer group.
Note: To delete a consumer group, make sure it has no members.
Signature:
def delete_consumer_group(bootstrap_servers: str = None,
group_id: str = None,
configuration: Dict[str, Dict[str, str]] = None,
secrets: Dict[str, Dict[str, str]] = None) -> bool:
pass
Arguments:
Name | Type | Default | Required |
---|---|---|---|
bootstrap_servers | string | null | No |
group_id | string | null | No |
Usage:
{
"name": "delete-consumer-group",
"type": "action",
"provider": {
"type": "python",
"module": "chaoskafka.actions",
"func": "delete_consumer_group"
}
}
name: delete-consumer-group
provider:
func: delete_consumer_group
module: chaoskafka.actions
type: python
type: action
delete_kafka_topic
¶
Type | action |
Module | chaoskafka.actions |
Name | delete_kafka_topic |
Return | boolean |
Delete a Kafka topic.
This function deletes a specified Kafka topic from the cluster. It uses the Kafka AdminClient to issue the delete operation.
Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. topic (str): The name of the Kafka topic to delete.
Returns: bool: True if the topic was successfully deleted, False otherwise.
Raises: FailedActivity: If there is an issue deleting the topic.
Signature:
def delete_kafka_topic(bootstrap_servers: str = None,
topic: str = None,
configuration: Dict[str, Dict[str, str]] = None,
secrets: Dict[str, Dict[str, str]] = None) -> bool:
pass
Arguments:
Name | Type | Default | Required |
---|---|---|---|
bootstrap_servers | string | null | No |
topic | string | null | No |
Usage:
{
"name": "delete-kafka-topic",
"type": "action",
"provider": {
"type": "python",
"module": "chaoskafka.actions",
"func": "delete_kafka_topic"
}
}
name: delete-kafka-topic
provider:
func: delete_kafka_topic
module: chaoskafka.actions
type: python
type: action
produce_messages
¶
Type | action |
Module | chaoskafka.actions |
Name | produce_messages |
Return | list |
Produce messages to a Kafka topic.
This function sends a list of messages to a specified Kafka topic using the Kafka Producer. Each message is sent to a specified partition within the topic.
Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. client_id (str): The client ID for the producer. topic (str): The name of the Kafka topic to produce messages to. messages (List[str]): A list of messages to be sent to the topic. partition (int): The partition within the topic to send messages to.
Returns: List[Dict]: A list of dictionaries containing the result of each message produced, with either an “error” key or a “message” key.
Raises: FailedActivity: If the topic is None or if there is an issue producing the messages.
Signature:
def produce_messages(bootstrap_servers: str = None,
client_id: str = 'chaostoolkit',
topic: str = None,
messages: List[str] = [],
partition: int = 0,
configuration: Dict[str, Dict[str, str]] = None,
secrets: Dict[str, Dict[str, str]] = None) -> List[dict]:
pass
Arguments:
Name | Type | Default | Required |
---|---|---|---|
bootstrap_servers | string | null | No |
client_id | string | “chaostoolkit” | No |
topic | string | null | No |
messages | list | [] | No |
partition | integer | 0 | No |
Usage:
{
"name": "produce-messages",
"type": "action",
"provider": {
"type": "python",
"module": "chaoskafka.actions",
"func": "produce_messages"
}
}
name: produce-messages
provider:
func: produce_messages
module: chaoskafka.actions
type: python
type: action
rebalance_consumer_group
¶
Type | action |
Module | chaoskafka.actions |
Name | rebalance_consumer_group |
Return | boolean |
Rebalance a Kafka consumer group.
This function forces a rebalance of a specified Kafka consumer group by subscribing to a given topic, waiting for a short period, and then closing the consumer. This action triggers the consumer group to rebalance.
Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. topic (str): The name of the Kafka topic to subscribe to. group_id (str): The consumer group ID.
Returns: bool: True if the rebalance was successfully triggered, False otherwise.
Raises: FailedActivity: If the topic is None or if there is an issue rebalancing the consumer group.
Signature:
def rebalance_consumer_group(
bootstrap_servers: str = None,
topic: str = None,
group_id: str = None,
configuration: Dict[str, Dict[str, str]] = None,
secrets: Dict[str, Dict[str, str]] = None) -> bool:
pass
Arguments:
Name | Type | Default | Required |
---|---|---|---|
bootstrap_servers | string | null | No |
topic | string | null | No |
group_id | string | null | No |
Usage:
{
"name": "rebalance-consumer-group",
"type": "action",
"provider": {
"type": "python",
"module": "chaoskafka.actions",
"func": "rebalance_consumer_group"
}
}
name: rebalance-consumer-group
provider:
func: rebalance_consumer_group
module: chaoskafka.actions
type: python
type: action
probes¶
all_replicas_in_sync
¶
Type | probe |
Module | chaoskafka.probes |
Name | all_replicas_in_sync |
Return | boolean |
Check if all replicas for each partition of a Kafka topic are in sync with the leader.
This function verifies the health of a specified Kafka topic by ensuring that all replicas for each partition are in sync with the leader. If all replicas are in sync, the topic is considered healthy.
Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. topic (str): The name of the Kafka topic to check.
Returns: bool: True if all replicas for each partition are in sync with the leader, False otherwise.
Raises: FailedActivity: If some kafka exception was raised.
Signature:
def all_replicas_in_sync(bootstrap_servers: str = None,
topic: str = None,
configuration: Dict[str, Dict[str, str]] = None,
secrets: Dict[str, Dict[str, str]] = None) -> bool:
pass
Arguments:
Name | Type | Default | Required |
---|---|---|---|
bootstrap_servers | string | null | No |
topic | string | null | No |
Usage:
{
"name": "all-replicas-in-sync",
"type": "probe",
"provider": {
"type": "python",
"module": "chaoskafka.probes",
"func": "all_replicas_in_sync"
}
}
name: all-replicas-in-sync
provider:
func: all_replicas_in_sync
module: chaoskafka.probes
type: python
type: probe
check_consumer_lag_under_threshold
¶
Type | probe |
Module | chaoskafka.probes |
Name | check_consumer_lag_under_threshold |
Return | boolean |
Check if the consumer lag is under a certain threshold for a specific partition or all partitions.
This function checks the consumer lag for a specified Kafka topic and consumer group, and verifies if the lag is under a defined threshold. It can check the lag for either a specific partition or all partitions.
Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. group_id (str): The consumer group ID. topic (str): The name of the Kafka topic. threshold (int): The maximum allowable lag threshold. partition (Optional[int]): The specific partition to check. If None, checks all partitions.
Returns: bool: True if the consumer lag is under the threshold for the specified partition or all partitions, False otherwise.
Raises: FailedActivity: If there is an issue accessing the cluster metadata.
Signature:
def check_consumer_lag_under_threshold(
bootstrap_servers: str = None,
group_id: str = None,
topic: str = None,
threshold: int = 0,
partition: Optional[int] = None,
configuration: Dict[str, Dict[str, str]] = None,
secrets: Dict[str, Dict[str, str]] = None) -> bool:
pass
Arguments:
Name | Type | Default | Required |
---|---|---|---|
bootstrap_servers | string | null | No |
group_id | string | null | No |
topic | string | null | No |
threshold | integer | 0 | No |
partition | object | null | No |
Usage:
{
"name": "check-consumer-lag-under-threshold",
"type": "probe",
"provider": {
"type": "python",
"module": "chaoskafka.probes",
"func": "check_consumer_lag_under_threshold"
}
}
name: check-consumer-lag-under-threshold
provider:
func: check_consumer_lag_under_threshold
module: chaoskafka.probes
type: python
type: probe
cluster_doesnt_have_under_replicated_partitions
¶
Type | probe |
Module | chaoskafka.probes |
Name | cluster_doesnt_have_under_replicated_partitions |
Return | boolean |
Check if the Kafka cluster has under-replicated partitions.
This function verifies the health of the entire Kafka cluster by checking if any partition is under-replicated. A partition is considered under-replicated if the number of in-sync replicas (ISRs) is less than the total number of replicas.
Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers.
Returns: bool: True if no partition in the cluster is under-replicated, False otherwise.
Raises: KafkaException: If there is an underlying Kafka-related error.
Signature:
def cluster_doesnt_have_under_replicated_partitions(
bootstrap_servers: str = None,
configuration: Dict[str, Dict[str, str]] = None,
secrets: Dict[str, Dict[str, str]] = None) -> bool:
pass
Arguments:
Name | Type | Default | Required |
---|---|---|---|
bootstrap_servers | string | null | No |
Usage:
{
"name": "cluster-doesnt-have-under-replicated-partitions",
"type": "probe",
"provider": {
"type": "python",
"module": "chaoskafka.probes",
"func": "cluster_doesnt_have_under_replicated_partitions"
}
}
name: cluster-doesnt-have-under-replicated-partitions
provider:
func: cluster_doesnt_have_under_replicated_partitions
module: chaoskafka.probes
type: python
type: probe
describe_kafka_topic
¶
Type | probe |
Module | chaoskafka.probes |
Name | describe_kafka_topic |
Return | mapping |
Describe a Kafka topic and its partitions.
This function retrieves metadata for a specified Kafka topic, including the number of partitions and detailed information about each partition. It first checks if the topic exists in the Kafka cluster.
Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. topic (str): The name of the Kafka topic to describe.
Returns: Dict: A dictionary containing detailed information about the topic and its partitions.
Raises: FailedActivity: If some kafka exception was raised.
Signature:
def describe_kafka_topic(bootstrap_servers: str = None,
topic: str = None,
configuration: Dict[str, Dict[str, str]] = None,
secrets: Dict[str, Dict[str, str]] = None) -> Dict:
pass
Arguments:
Name | Type | Default | Required |
---|---|---|---|
bootstrap_servers | string | null | No |
topic | string | null | No |
Usage:
{
"name": "describe-kafka-topic",
"type": "probe",
"provider": {
"type": "python",
"module": "chaoskafka.probes",
"func": "describe_kafka_topic"
}
}
name: describe-kafka-topic
provider:
func: describe_kafka_topic
module: chaoskafka.probes
type: python
type: probe
topic_has_no_offline_partitions
¶
Type | probe |
Module | chaoskafka.probes |
Name | topic_has_no_offline_partitions |
Return | boolean |
Check if a Kafka topic has no offline partitions.
This function verifies the health of a specified Kafka topic by checking if all its partitions have an active leader and at least one in-sync replica (ISR). If some partition is offline,the topic is not considered healthy.
Args: bootstrap_servers (str): A comma-separated list of Kafka bootstrap servers. topic (str): The name of the Kafka topic to check.
Returns: bool: True if all partitions are online and don’t have offline replicas, False otherwise.
Raises: FailedActivity: If some kafka exception was raised.
Signature:
def topic_has_no_offline_partitions(
bootstrap_servers: str = None,
topic: str = None,
configuration: Dict[str, Dict[str, str]] = None,
secrets: Dict[str, Dict[str, str]] = None) -> bool:
pass
Arguments:
Name | Type | Default | Required |
---|---|---|---|
bootstrap_servers | string | null | No |
topic | string | null | No |
Usage:
{
"name": "topic-has-no-offline-partitions",
"type": "probe",
"provider": {
"type": "python",
"module": "chaoskafka.probes",
"func": "topic_has_no_offline_partitions"
}
}
name: topic-has-no-offline-partitions
provider:
func: topic_has_no_offline_partitions
module: chaoskafka.probes
type: python
type: probe