data
Apache Kafka CLI for AI Agents
Use the Apache Kafka CLI from KosmoKrator to call Apache Kafka tools headlessly, return JSON, inspect schemas, and automate workflows from coding agents, scripts, and CI.Apache Kafka CLI Setup
Apache Kafka can be configured headlessly with `kosmokrator integrations:configure kafka`.
# Install KosmoKrator first if it is not available on PATH.
curl -fsSL https://raw.githubusercontent.com/OpenCompanyApp/kosmokrator/main/install.sh | bash
# Configure and verify this integration.
kosmokrator integrations:configure kafka --set api_token="$KAFKA_API_TOKEN" --enable --read allow --write ask --json
kosmokrator integrations:doctor kafka --json
kosmokrator integrations:status --json Credentials
Authentication type: API token api_token. Configure credentials once, then reuse the same stored profile from scripts, coding CLIs, Lua, and MCP.
| Key | Env var | Type | Required | Label |
|---|---|---|---|---|
api_token | KAFKA_API_TOKEN | Secret secret | yes | API Token |
cluster_id | KAFKA_CLUSTER_ID | Text text | no | Cluster ID |
Command Patterns
The generic command is stable across every integration. The provider shortcut is shorter for humans.
kosmo integrations:call kafka.kafka_list_topics '{"cluster_id":"example_cluster_id"}' --json kosmo integrations:kafka kafka_list_topics '{"cluster_id":"example_cluster_id"}' --json Discovery
These commands return structured output for coding agents that need to inspect capabilities before choosing a function.
kosmo integrations:docs kafka --json
kosmo integrations:docs kafka.kafka_list_topics --json
kosmo integrations:schema kafka.kafka_list_topics --json
kosmo integrations:search "Apache Kafka" --json
kosmo integrations:list --json Automation Contexts
The same configured command surface works in these environments. The command does not change unless the host wrapper, credentials, or permissions change.
CLI Functions
Every function below can be called headlessly. Commands are highlighted, copyable, and scroll horizontally when payloads are long.
kafka.kafka_list_topics
List Kafka topics in a cluster. Returns topic names, partition counts, replication factors, and status.
read - Parameters
- cluster_id
kosmo integrations:call kafka.kafka_list_topics '{"cluster_id":"example_cluster_id"}' --json kosmo integrations:kafka kafka_list_topics '{"cluster_id":"example_cluster_id"}' --json kafka.kafka_get_topic
Get full details of a specific Kafka topic by name. Returns partition count, replication factor, and topic configuration.
read - Parameters
- topic_name, cluster_id
kosmo integrations:call kafka.kafka_get_topic '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json kosmo integrations:kafka kafka_get_topic '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json kafka.kafka_create_topic
Create a new Kafka topic in a cluster. Specify the topic name, partition count, and optional replication factor and configs.
write - Parameters
- topic_name, partitions_count, replication_factor, configs, cluster_id
kosmo integrations:call kafka.kafka_create_topic '{"topic_name":"example_topic_name","partitions_count":1,"replication_factor":1,"configs":"example_configs","cluster_id":"example_cluster_id"}' --json kosmo integrations:kafka kafka_create_topic '{"topic_name":"example_topic_name","partitions_count":1,"replication_factor":1,"configs":"example_configs","cluster_id":"example_cluster_id"}' --json kafka.kafka_list_clusters
List Kafka clusters in your Confluent Cloud environment. Returns cluster IDs, names, types, and status.
read - Parameters
- none
kosmo integrations:call kafka.kafka_list_clusters '{}' --json kosmo integrations:kafka kafka_list_clusters '{}' --json kafka.kafka_get_cluster
Get details of a specific Kafka cluster. Returns broker count, controller info, and cluster configuration.
read - Parameters
- cluster_id
kosmo integrations:call kafka.kafka_get_cluster '{"cluster_id":"example_cluster_id"}' --json kosmo integrations:kafka kafka_get_cluster '{"cluster_id":"example_cluster_id"}' --json kafka.kafka_list_producers
List producers for a specific Kafka topic. Returns producer IDs, client IDs, and connection details.
read - Parameters
- topic_name, cluster_id
kosmo integrations:call kafka.kafka_list_producers '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json kosmo integrations:kafka kafka_list_producers '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json kafka.kafka_get_current_user
Get the currently authenticated Confluent Cloud user. Useful for verifying credentials and identifying the connected account.
read - Parameters
- none
kosmo integrations:call kafka.kafka_get_current_user '{}' --json kosmo integrations:kafka kafka_get_current_user '{}' --json Function Schemas
Use these parameter tables when building CLI payloads without calling integrations:schema first.
kafka.kafka_list_topics 1 parameters
kosmo integrations:schema kafka.kafka_list_topics --json | Parameter | Type | Required | Description |
|---|---|---|---|
cluster_id | string | no | Override the default Kafka cluster ID. |
kafka.kafka_get_topic 2 parameters
kosmo integrations:schema kafka.kafka_get_topic --json | Parameter | Type | Required | Description |
|---|---|---|---|
topic_name | string | yes | The name of the topic to retrieve. |
cluster_id | string | no | Override the default Kafka cluster ID. |
kafka.kafka_create_topic 5 parameters
kosmo integrations:schema kafka.kafka_create_topic --json | Parameter | Type | Required | Description |
|---|---|---|---|
topic_name | string | yes | The name for the new topic. |
partitions_count | integer | yes | Number of partitions for the topic (e.g., 6). |
replication_factor | integer | no | Replication factor (e.g., 3 for production). Defaults to the cluster default. |
configs | object | no | JSON-encoded topic configs: retention.ms, cleanup.policy, etc. |
cluster_id | string | no | Override the default Kafka cluster ID. |
kafka.kafka_list_clusters 0 parameters
kosmo integrations:schema kafka.kafka_list_clusters --json | Parameter | Type | Required | Description |
|---|---|---|---|
| No parameters. | |||
kafka.kafka_get_cluster 1 parameters
kosmo integrations:schema kafka.kafka_get_cluster --json | Parameter | Type | Required | Description |
|---|---|---|---|
cluster_id | string | no | The cluster ID to retrieve. Uses the default cluster if not specified. |
kafka.kafka_list_producers 2 parameters
kosmo integrations:schema kafka.kafka_list_producers --json | Parameter | Type | Required | Description |
|---|---|---|---|
topic_name | string | yes | The topic name to list producers for. |
cluster_id | string | no | Override the default Kafka cluster ID. |
kafka.kafka_get_current_user 0 parameters
kosmo integrations:schema kafka.kafka_get_current_user --json | Parameter | Type | Required | Description |
|---|---|---|---|
| No parameters. | |||
Permissions
Headless calls still follow the integration read/write permission policy. Configure read/write defaults with
integrations:configure. Add --force only for trusted automation that should bypass that policy.