data
Apache Kafka Lua API for KosmoKrator Agents
Agent-facing Lua documentation and function reference for the Apache Kafka KosmoKrator integration.Lua Namespace
Agents call this integration through app.integrations.kafka.*.
Use lua_read_doc("integrations.kafka") inside KosmoKrator to discover the same reference at runtime.
Call Lua from the Headless CLI
Use kosmo integrations:lua when a shell script, CI job, cron job, or another coding CLI should run a deterministic
Apache Kafka workflow without starting an interactive agent session.
kosmo integrations:lua --eval 'dump(app.integrations.kafka.list_topics({cluster_id = "example_cluster_id"}))' --json kosmo integrations:lua --eval 'print(docs.read("kafka"))' --json
kosmo integrations:lua --eval 'print(docs.read("kafka.list_topics"))' --json Workflow file
Put repeatable logic in a Lua file, then execute it with JSON output for the calling process.
local kafka = app.integrations.kafka
local result = kafka.list_topics({cluster_id = "example_cluster_id"})
dump(result) kosmo integrations:lua workflow.lua --json
kosmo integrations:lua workflow.lua --force --json integrations:lua exposes app.integrations.kafka, app.mcp.*, docs.*, json.*, and regex.*. Use app.integrations.kafka.default.* or app.integrations.kafka.work.* when you configured named credential accounts.
MCP-only Lua
If the script only needs configured MCP servers and does not need Apache Kafka, use the narrower mcp:lua command.
# Use mcp:lua for MCP-only scripts; use integrations:lua for this integration namespace.
kosmo mcp:lua --eval 'dump(mcp.servers())' --json Agent-Facing Lua Docs
This is the rendered version of the full Lua documentation exposed to agents when they inspect the integration namespace.
Apache Kafka (Confluent Cloud) — Lua API Reference
list_topics
List Kafka topics in a cluster.
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
cluster_id | string | no | Override the default Kafka cluster ID |
Example
local result = app.integrations.kafka.list_topics({})
for _, topic in ipairs(result.data or {}) do
print(topic.topic_name .. " (partitions: " .. topic.partitions_count .. ")")
end
get_topic
Get full details of a specific Kafka topic.
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
topic_name | string | yes | The name of the topic to retrieve |
cluster_id | string | no | Override the default Kafka cluster ID |
Example
local result = app.integrations.kafka.get_topic({
topic_name = "orders"
})
print("Topic: " .. result.topic_name)
print("Partitions: " .. result.partitions_count)
print("Replication: " .. (result.replication_factor or "default"))
create_topic
Create a new Kafka topic in a cluster.
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
topic_name | string | yes | The name for the new topic |
partitions_count | integer | yes | Number of partitions (e.g., 6) |
replication_factor | integer | no | Replication factor (e.g., 3 for production) |
configs | object | no | JSON-encoded topic configs: retention.ms, cleanup.policy, etc. |
cluster_id | string | no | Override the default Kafka cluster ID |
Topic Config Options
Common configuration options:
{
"retention.ms": "604800000",
"cleanup.policy": "delete",
"max.message.bytes": "1048576"
}
Example
local result = app.integrations.kafka.create_topic({
topic_name = "events",
partitions_count = 6,
replication_factor = 3,
configs = '{"retention.ms":"604800000","cleanup.policy":"delete"}'
})
print("Created topic: " .. result.topic_name)
list_clusters
List Kafka clusters in your Confluent Cloud environment.
Parameters
None.
Example
local result = app.integrations.kafka.list_clusters({})
for _, cluster in ipairs(result.data or {}) do
print(cluster.cluster_id .. ": " .. (cluster.display_name or "unnamed"))
end
get_cluster
Get details of a specific Kafka cluster.
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
cluster_id | string | no | The cluster ID to retrieve (uses default if not specified) |
Example
local result = app.integrations.kafka.get_cluster({
cluster_id = "lkc-abc123"
})
print("Cluster: " .. (result.display_name or result.cluster_id))
print("Brokers: " .. (result.broker_count or "unknown"))
print("Controller: " .. (result.controller_id or "unknown"))
list_producers
List producers for a specific Kafka topic.
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
topic_name | string | yes | The topic name to list producers for |
cluster_id | string | no | Override the default Kafka cluster ID |
Example
local result = app.integrations.kafka.list_producers({
topic_name = "orders"
})
for _, producer in ipairs(result.data or {}) do
print("Producer: " .. (producer.client_id or producer.producer_id))
end
get_current_user
Get the currently authenticated Confluent Cloud user.
Parameters
None.
Example
local result = app.integrations.kafka.get_current_user({})
print("User: " .. (result.handle or "unknown"))
print("Name: " .. (result.full_name or "unknown"))
print("Email: " .. (result.email or "unknown"))
Multi-Account Usage
If you have multiple Kafka accounts configured, use account-specific namespaces:
-- Default account (always works)
app.integrations.kafka.list_topics({})
-- Explicit default (portable across setups)
app.integrations.kafka.default.list_topics({})
-- Named accounts
app.integrations.kafka.production.list_topics({})
app.integrations.kafka.staging.list_topics({})
All functions are identical across accounts — only the credentials differ.
Common Patterns
Create a topic with production settings
local result = app.integrations.kafka.create_topic({
topic_name = "user-events",
partitions_count = 12,
replication_factor = 3,
configs = '{"retention.ms":"259200000","cleanup.policy":"compact,delete"}'
})
print("Created topic: " .. result.topic_name)
List all topics and their partition counts
local result = app.integrations.kafka.list_topics({})
local topics = result.data or {}
print("Found " .. #topics .. " topics:")
for _, topic in ipairs(topics) do
print(" - " .. topic.topic_name .. " (" .. topic.partitions_count .. " partitions)")
end
Check cluster health and verify credentials
-- Verify credentials
local user = app.integrations.kafka.get_current_user({})
print("Connected as: " .. (user.full_name or user.handle))
-- Get cluster details
local cluster = app.integrations.kafka.get_cluster({})
print("Cluster: " .. (cluster.display_name or cluster.cluster_id))
print("Status: " .. (cluster.status or "unknown"))Raw agent markdown
# Apache Kafka (Confluent Cloud) — Lua API Reference
## list_topics
List Kafka topics in a cluster.
### Parameters
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `cluster_id` | string | no | Override the default Kafka cluster ID |
### Example
```lua
local result = app.integrations.kafka.list_topics({})
for _, topic in ipairs(result.data or {}) do
print(topic.topic_name .. " (partitions: " .. topic.partitions_count .. ")")
end
```
---
## get_topic
Get full details of a specific Kafka topic.
### Parameters
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `topic_name` | string | yes | The name of the topic to retrieve |
| `cluster_id` | string | no | Override the default Kafka cluster ID |
### Example
```lua
local result = app.integrations.kafka.get_topic({
topic_name = "orders"
})
print("Topic: " .. result.topic_name)
print("Partitions: " .. result.partitions_count)
print("Replication: " .. (result.replication_factor or "default"))
```
---
## create_topic
Create a new Kafka topic in a cluster.
### Parameters
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `topic_name` | string | yes | The name for the new topic |
| `partitions_count` | integer | yes | Number of partitions (e.g., 6) |
| `replication_factor` | integer | no | Replication factor (e.g., 3 for production) |
| `configs` | object | no | JSON-encoded topic configs: retention.ms, cleanup.policy, etc. |
| `cluster_id` | string | no | Override the default Kafka cluster ID |
### Topic Config Options
Common configuration options:
```json
{
"retention.ms": "604800000",
"cleanup.policy": "delete",
"max.message.bytes": "1048576"
}
```
### Example
```lua
local result = app.integrations.kafka.create_topic({
topic_name = "events",
partitions_count = 6,
replication_factor = 3,
configs = '{"retention.ms":"604800000","cleanup.policy":"delete"}'
})
print("Created topic: " .. result.topic_name)
```
---
## list_clusters
List Kafka clusters in your Confluent Cloud environment.
### Parameters
None.
### Example
```lua
local result = app.integrations.kafka.list_clusters({})
for _, cluster in ipairs(result.data or {}) do
print(cluster.cluster_id .. ": " .. (cluster.display_name or "unnamed"))
end
```
---
## get_cluster
Get details of a specific Kafka cluster.
### Parameters
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `cluster_id` | string | no | The cluster ID to retrieve (uses default if not specified) |
### Example
```lua
local result = app.integrations.kafka.get_cluster({
cluster_id = "lkc-abc123"
})
print("Cluster: " .. (result.display_name or result.cluster_id))
print("Brokers: " .. (result.broker_count or "unknown"))
print("Controller: " .. (result.controller_id or "unknown"))
```
---
## list_producers
List producers for a specific Kafka topic.
### Parameters
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `topic_name` | string | yes | The topic name to list producers for |
| `cluster_id` | string | no | Override the default Kafka cluster ID |
### Example
```lua
local result = app.integrations.kafka.list_producers({
topic_name = "orders"
})
for _, producer in ipairs(result.data or {}) do
print("Producer: " .. (producer.client_id or producer.producer_id))
end
```
---
## get_current_user
Get the currently authenticated Confluent Cloud user.
### Parameters
None.
### Example
```lua
local result = app.integrations.kafka.get_current_user({})
print("User: " .. (result.handle or "unknown"))
print("Name: " .. (result.full_name or "unknown"))
print("Email: " .. (result.email or "unknown"))
```
---
## Multi-Account Usage
If you have multiple Kafka accounts configured, use account-specific namespaces:
```lua
-- Default account (always works)
app.integrations.kafka.list_topics({})
-- Explicit default (portable across setups)
app.integrations.kafka.default.list_topics({})
-- Named accounts
app.integrations.kafka.production.list_topics({})
app.integrations.kafka.staging.list_topics({})
```
All functions are identical across accounts — only the credentials differ.
---
## Common Patterns
### Create a topic with production settings
```lua
local result = app.integrations.kafka.create_topic({
topic_name = "user-events",
partitions_count = 12,
replication_factor = 3,
configs = '{"retention.ms":"259200000","cleanup.policy":"compact,delete"}'
})
print("Created topic: " .. result.topic_name)
```
### List all topics and their partition counts
```lua
local result = app.integrations.kafka.list_topics({})
local topics = result.data or {}
print("Found " .. #topics .. " topics:")
for _, topic in ipairs(topics) do
print(" - " .. topic.topic_name .. " (" .. topic.partitions_count .. " partitions)")
end
```
### Check cluster health and verify credentials
```lua
-- Verify credentials
local user = app.integrations.kafka.get_current_user({})
print("Connected as: " .. (user.full_name or user.handle))
-- Get cluster details
local cluster = app.integrations.kafka.get_cluster({})
print("Cluster: " .. (cluster.display_name or cluster.cluster_id))
print("Status: " .. (cluster.status or "unknown"))
``` local result = app.integrations.kafka.list_topics({cluster_id = "example_cluster_id"})
print(result) Functions
list_topics Read
List Kafka topics in a cluster. Returns topic names, partition counts, replication factors, and status.
- Lua path
app.integrations.kafka.list_topics- Full name
kafka.kafka_list_topics
| Parameter | Type | Required | Description |
|---|---|---|---|
cluster_id | string | no | Override the default Kafka cluster ID. |
get_topic Read
Get full details of a specific Kafka topic by name. Returns partition count, replication factor, and topic configuration.
- Lua path
app.integrations.kafka.get_topic- Full name
kafka.kafka_get_topic
| Parameter | Type | Required | Description |
|---|---|---|---|
topic_name | string | yes | The name of the topic to retrieve. |
cluster_id | string | no | Override the default Kafka cluster ID. |
create_topic Write
Create a new Kafka topic in a cluster. Specify the topic name, partition count, and optional replication factor and configs.
- Lua path
app.integrations.kafka.create_topic- Full name
kafka.kafka_create_topic
| Parameter | Type | Required | Description |
|---|---|---|---|
topic_name | string | yes | The name for the new topic. |
partitions_count | integer | yes | Number of partitions for the topic (e.g., 6). |
replication_factor | integer | no | Replication factor (e.g., 3 for production). Defaults to the cluster default. |
configs | object | no | JSON-encoded topic configs: retention.ms, cleanup.policy, etc. |
cluster_id | string | no | Override the default Kafka cluster ID. |
list_clusters Read
List Kafka clusters in your Confluent Cloud environment. Returns cluster IDs, names, types, and status.
- Lua path
app.integrations.kafka.list_clusters- Full name
kafka.kafka_list_clusters
| Parameter | Type | Required | Description |
|---|---|---|---|
| No parameters. | |||
get_cluster Read
Get details of a specific Kafka cluster. Returns broker count, controller info, and cluster configuration.
- Lua path
app.integrations.kafka.get_cluster- Full name
kafka.kafka_get_cluster
| Parameter | Type | Required | Description |
|---|---|---|---|
cluster_id | string | no | The cluster ID to retrieve. Uses the default cluster if not specified. |
list_producers Read
List producers for a specific Kafka topic. Returns producer IDs, client IDs, and connection details.
- Lua path
app.integrations.kafka.list_producers- Full name
kafka.kafka_list_producers
| Parameter | Type | Required | Description |
|---|---|---|---|
topic_name | string | yes | The topic name to list producers for. |
cluster_id | string | no | Override the default Kafka cluster ID. |
get_current_user Read
Get the currently authenticated Confluent Cloud user. Useful for verifying credentials and identifying the connected account.
- Lua path
app.integrations.kafka.get_current_user- Full name
kafka.kafka_get_current_user
| Parameter | Type | Required | Description |
|---|---|---|---|
| No parameters. | |||