KosmoKrator

data

Apache Kafka CLI for AI Agents

Use the Apache Kafka CLI from KosmoKrator to call Apache Kafka tools headlessly, return JSON, inspect schemas, and automate workflows from coding agents, scripts, and CI.

Apache Kafka CLI Setup

Apache Kafka can be configured headlessly with `kosmokrator integrations:configure kafka`.

Install, configure, and verify
# Install KosmoKrator first if it is not available on PATH.
curl -fsSL https://raw.githubusercontent.com/OpenCompanyApp/kosmokrator/main/install.sh | bash

# Configure and verify this integration.
kosmokrator integrations:configure kafka --set api_token="$KAFKA_API_TOKEN" --enable --read allow --write ask --json
kosmokrator integrations:doctor kafka --json
kosmokrator integrations:status --json

Credentials

Authentication type: API token api_token. Configure credentials once, then reuse the same stored profile from scripts, coding CLIs, Lua, and MCP.

KeyEnv varTypeRequiredLabel
api_token KAFKA_API_TOKEN Secret secret yes API Token
cluster_id KAFKA_CLUSTER_ID Text text no Cluster ID

Command Patterns

The generic command is stable across every integration. The provider shortcut is shorter for humans.

Generic CLI call
kosmo integrations:call kafka.kafka_list_topics '{"cluster_id":"example_cluster_id"}' --json
Provider shortcut
kosmo integrations:kafka kafka_list_topics '{"cluster_id":"example_cluster_id"}' --json

Discovery

These commands return structured output for coding agents that need to inspect capabilities before choosing a function.

Discovery commands
kosmo integrations:docs kafka --json
kosmo integrations:docs kafka.kafka_list_topics --json
kosmo integrations:schema kafka.kafka_list_topics --json
kosmo integrations:search "Apache Kafka" --json
kosmo integrations:list --json

Automation Contexts

The same configured command surface works in these environments. The command does not change unless the host wrapper, credentials, or permissions change.

CLI Functions

Every function below can be called headlessly. Commands are highlighted, copyable, and scroll horizontally when payloads are long.

kafka.kafka_list_topics

List Kafka topics in a cluster. Returns topic names, partition counts, replication factors, and status.

Read read
Parameters
cluster_id
Generic call
kosmo integrations:call kafka.kafka_list_topics '{"cluster_id":"example_cluster_id"}' --json
Shortcut
kosmo integrations:kafka kafka_list_topics '{"cluster_id":"example_cluster_id"}' --json

kafka.kafka_get_topic

Get full details of a specific Kafka topic by name. Returns partition count, replication factor, and topic configuration.

Read read
Parameters
topic_name, cluster_id
Generic call
kosmo integrations:call kafka.kafka_get_topic '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json
Shortcut
kosmo integrations:kafka kafka_get_topic '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json

kafka.kafka_create_topic

Create a new Kafka topic in a cluster. Specify the topic name, partition count, and optional replication factor and configs.

Write write
Parameters
topic_name, partitions_count, replication_factor, configs, cluster_id
Generic call
kosmo integrations:call kafka.kafka_create_topic '{"topic_name":"example_topic_name","partitions_count":1,"replication_factor":1,"configs":"example_configs","cluster_id":"example_cluster_id"}' --json
Shortcut
kosmo integrations:kafka kafka_create_topic '{"topic_name":"example_topic_name","partitions_count":1,"replication_factor":1,"configs":"example_configs","cluster_id":"example_cluster_id"}' --json

kafka.kafka_list_clusters

List Kafka clusters in your Confluent Cloud environment. Returns cluster IDs, names, types, and status.

Read read
Parameters
none
Generic call
kosmo integrations:call kafka.kafka_list_clusters '{}' --json
Shortcut
kosmo integrations:kafka kafka_list_clusters '{}' --json

kafka.kafka_get_cluster

Get details of a specific Kafka cluster. Returns broker count, controller info, and cluster configuration.

Read read
Parameters
cluster_id
Generic call
kosmo integrations:call kafka.kafka_get_cluster '{"cluster_id":"example_cluster_id"}' --json
Shortcut
kosmo integrations:kafka kafka_get_cluster '{"cluster_id":"example_cluster_id"}' --json

kafka.kafka_list_producers

List producers for a specific Kafka topic. Returns producer IDs, client IDs, and connection details.

Read read
Parameters
topic_name, cluster_id
Generic call
kosmo integrations:call kafka.kafka_list_producers '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json
Shortcut
kosmo integrations:kafka kafka_list_producers '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json

kafka.kafka_get_current_user

Get the currently authenticated Confluent Cloud user. Useful for verifying credentials and identifying the connected account.

Read read
Parameters
none
Generic call
kosmo integrations:call kafka.kafka_get_current_user '{}' --json
Shortcut
kosmo integrations:kafka kafka_get_current_user '{}' --json

Function Schemas

Use these parameter tables when building CLI payloads without calling integrations:schema first.

kafka.kafka_list_topics 1 parameters
Schema command
kosmo integrations:schema kafka.kafka_list_topics --json
ParameterTypeRequiredDescription
cluster_id string no Override the default Kafka cluster ID.
kafka.kafka_get_topic 2 parameters
Schema command
kosmo integrations:schema kafka.kafka_get_topic --json
ParameterTypeRequiredDescription
topic_name string yes The name of the topic to retrieve.
cluster_id string no Override the default Kafka cluster ID.
kafka.kafka_create_topic 5 parameters
Schema command
kosmo integrations:schema kafka.kafka_create_topic --json
ParameterTypeRequiredDescription
topic_name string yes The name for the new topic.
partitions_count integer yes Number of partitions for the topic (e.g., 6).
replication_factor integer no Replication factor (e.g., 3 for production). Defaults to the cluster default.
configs object no JSON-encoded topic configs: retention.ms, cleanup.policy, etc.
cluster_id string no Override the default Kafka cluster ID.
kafka.kafka_list_clusters 0 parameters
Schema command
kosmo integrations:schema kafka.kafka_list_clusters --json
ParameterTypeRequiredDescription
No parameters.
kafka.kafka_get_cluster 1 parameters
Schema command
kosmo integrations:schema kafka.kafka_get_cluster --json
ParameterTypeRequiredDescription
cluster_id string no The cluster ID to retrieve. Uses the default cluster if not specified.
kafka.kafka_list_producers 2 parameters
Schema command
kosmo integrations:schema kafka.kafka_list_producers --json
ParameterTypeRequiredDescription
topic_name string yes The topic name to list producers for.
cluster_id string no Override the default Kafka cluster ID.
kafka.kafka_get_current_user 0 parameters
Schema command
kosmo integrations:schema kafka.kafka_get_current_user --json
ParameterTypeRequiredDescription
No parameters.

Permissions

Headless calls still follow the integration read/write permission policy. Configure read/write defaults with integrations:configure. Add --force only for trusted automation that should bypass that policy.