KosmoKrator

data

Apache Kafka Lua API for KosmoKrator Agents

Agent-facing Lua documentation and function reference for the Apache Kafka KosmoKrator integration.

Lua Namespace

Agents call this integration through app.integrations.kafka.*. Use lua_read_doc("integrations.kafka") inside KosmoKrator to discover the same reference at runtime.

Call Lua from the Headless CLI

Use kosmo integrations:lua when a shell script, CI job, cron job, or another coding CLI should run a deterministic Apache Kafka workflow without starting an interactive agent session.

Inline Lua call
kosmo integrations:lua --eval 'dump(app.integrations.kafka.list_topics({cluster_id = "example_cluster_id"}))' --json
Read Lua docs headlessly
kosmo integrations:lua --eval 'print(docs.read("kafka"))' --json
kosmo integrations:lua --eval 'print(docs.read("kafka.list_topics"))' --json

Workflow file

Put repeatable logic in a Lua file, then execute it with JSON output for the calling process.

workflow.lua
local kafka = app.integrations.kafka
local result = kafka.list_topics({cluster_id = "example_cluster_id"})

dump(result)
Run the workflow
kosmo integrations:lua workflow.lua --json
kosmo integrations:lua workflow.lua --force --json
Namespace note. integrations:lua exposes app.integrations.kafka, app.mcp.*, docs.*, json.*, and regex.*. Use app.integrations.kafka.default.* or app.integrations.kafka.work.* when you configured named credential accounts.

MCP-only Lua

If the script only needs configured MCP servers and does not need Apache Kafka, use the narrower mcp:lua command.

MCP Lua command
# Use mcp:lua for MCP-only scripts; use integrations:lua for this integration namespace.
kosmo mcp:lua --eval 'dump(mcp.servers())' --json

Agent-Facing Lua Docs

This is the rendered version of the full Lua documentation exposed to agents when they inspect the integration namespace.

Apache Kafka (Confluent Cloud) — Lua API Reference

list_topics

List Kafka topics in a cluster.

Parameters

NameTypeRequiredDescription
cluster_idstringnoOverride the default Kafka cluster ID

Example

local result = app.integrations.kafka.list_topics({})

for _, topic in ipairs(result.data or {}) do
  print(topic.topic_name .. " (partitions: " .. topic.partitions_count .. ")")
end

get_topic

Get full details of a specific Kafka topic.

Parameters

NameTypeRequiredDescription
topic_namestringyesThe name of the topic to retrieve
cluster_idstringnoOverride the default Kafka cluster ID

Example

local result = app.integrations.kafka.get_topic({
  topic_name = "orders"
})

print("Topic: " .. result.topic_name)
print("Partitions: " .. result.partitions_count)
print("Replication: " .. (result.replication_factor or "default"))

create_topic

Create a new Kafka topic in a cluster.

Parameters

NameTypeRequiredDescription
topic_namestringyesThe name for the new topic
partitions_countintegeryesNumber of partitions (e.g., 6)
replication_factorintegernoReplication factor (e.g., 3 for production)
configsobjectnoJSON-encoded topic configs: retention.ms, cleanup.policy, etc.
cluster_idstringnoOverride the default Kafka cluster ID

Topic Config Options

Common configuration options:

{
  "retention.ms": "604800000",
  "cleanup.policy": "delete",
  "max.message.bytes": "1048576"
}

Example

local result = app.integrations.kafka.create_topic({
  topic_name = "events",
  partitions_count = 6,
  replication_factor = 3,
  configs = '{"retention.ms":"604800000","cleanup.policy":"delete"}'
})

print("Created topic: " .. result.topic_name)

list_clusters

List Kafka clusters in your Confluent Cloud environment.

Parameters

None.

Example

local result = app.integrations.kafka.list_clusters({})

for _, cluster in ipairs(result.data or {}) do
  print(cluster.cluster_id .. ": " .. (cluster.display_name or "unnamed"))
end

get_cluster

Get details of a specific Kafka cluster.

Parameters

NameTypeRequiredDescription
cluster_idstringnoThe cluster ID to retrieve (uses default if not specified)

Example

local result = app.integrations.kafka.get_cluster({
  cluster_id = "lkc-abc123"
})

print("Cluster: " .. (result.display_name or result.cluster_id))
print("Brokers: " .. (result.broker_count or "unknown"))
print("Controller: " .. (result.controller_id or "unknown"))

list_producers

List producers for a specific Kafka topic.

Parameters

NameTypeRequiredDescription
topic_namestringyesThe topic name to list producers for
cluster_idstringnoOverride the default Kafka cluster ID

Example

local result = app.integrations.kafka.list_producers({
  topic_name = "orders"
})

for _, producer in ipairs(result.data or {}) do
  print("Producer: " .. (producer.client_id or producer.producer_id))
end

get_current_user

Get the currently authenticated Confluent Cloud user.

Parameters

None.

Example

local result = app.integrations.kafka.get_current_user({})

print("User: " .. (result.handle or "unknown"))
print("Name: " .. (result.full_name or "unknown"))
print("Email: " .. (result.email or "unknown"))

Multi-Account Usage

If you have multiple Kafka accounts configured, use account-specific namespaces:

-- Default account (always works)
app.integrations.kafka.list_topics({})

-- Explicit default (portable across setups)
app.integrations.kafka.default.list_topics({})

-- Named accounts
app.integrations.kafka.production.list_topics({})
app.integrations.kafka.staging.list_topics({})

All functions are identical across accounts — only the credentials differ.


Common Patterns

Create a topic with production settings

local result = app.integrations.kafka.create_topic({
  topic_name = "user-events",
  partitions_count = 12,
  replication_factor = 3,
  configs = '{"retention.ms":"259200000","cleanup.policy":"compact,delete"}'
})

print("Created topic: " .. result.topic_name)

List all topics and their partition counts

local result = app.integrations.kafka.list_topics({})

local topics = result.data or {}
print("Found " .. #topics .. " topics:")

for _, topic in ipairs(topics) do
  print("  - " .. topic.topic_name .. " (" .. topic.partitions_count .. " partitions)")
end

Check cluster health and verify credentials

-- Verify credentials
local user = app.integrations.kafka.get_current_user({})
print("Connected as: " .. (user.full_name or user.handle))

-- Get cluster details
local cluster = app.integrations.kafka.get_cluster({})
print("Cluster: " .. (cluster.display_name or cluster.cluster_id))
print("Status: " .. (cluster.status or "unknown"))
Raw agent markdown
# Apache Kafka (Confluent Cloud) — Lua API Reference

## list_topics

List Kafka topics in a cluster.

### Parameters

| Name | Type | Required | Description |
|------|------|----------|-------------|
| `cluster_id` | string | no | Override the default Kafka cluster ID |

### Example

```lua
local result = app.integrations.kafka.list_topics({})

for _, topic in ipairs(result.data or {}) do
  print(topic.topic_name .. " (partitions: " .. topic.partitions_count .. ")")
end
```

---

## get_topic

Get full details of a specific Kafka topic.

### Parameters

| Name | Type | Required | Description |
|------|------|----------|-------------|
| `topic_name` | string | yes | The name of the topic to retrieve |
| `cluster_id` | string | no | Override the default Kafka cluster ID |

### Example

```lua
local result = app.integrations.kafka.get_topic({
  topic_name = "orders"
})

print("Topic: " .. result.topic_name)
print("Partitions: " .. result.partitions_count)
print("Replication: " .. (result.replication_factor or "default"))
```

---

## create_topic

Create a new Kafka topic in a cluster.

### Parameters

| Name | Type | Required | Description |
|------|------|----------|-------------|
| `topic_name` | string | yes | The name for the new topic |
| `partitions_count` | integer | yes | Number of partitions (e.g., 6) |
| `replication_factor` | integer | no | Replication factor (e.g., 3 for production) |
| `configs` | object | no | JSON-encoded topic configs: retention.ms, cleanup.policy, etc. |
| `cluster_id` | string | no | Override the default Kafka cluster ID |

### Topic Config Options

Common configuration options:

```json
{
  "retention.ms": "604800000",
  "cleanup.policy": "delete",
  "max.message.bytes": "1048576"
}
```

### Example

```lua
local result = app.integrations.kafka.create_topic({
  topic_name = "events",
  partitions_count = 6,
  replication_factor = 3,
  configs = '{"retention.ms":"604800000","cleanup.policy":"delete"}'
})

print("Created topic: " .. result.topic_name)
```

---

## list_clusters

List Kafka clusters in your Confluent Cloud environment.

### Parameters

None.

### Example

```lua
local result = app.integrations.kafka.list_clusters({})

for _, cluster in ipairs(result.data or {}) do
  print(cluster.cluster_id .. ": " .. (cluster.display_name or "unnamed"))
end
```

---

## get_cluster

Get details of a specific Kafka cluster.

### Parameters

| Name | Type | Required | Description |
|------|------|----------|-------------|
| `cluster_id` | string | no | The cluster ID to retrieve (uses default if not specified) |

### Example

```lua
local result = app.integrations.kafka.get_cluster({
  cluster_id = "lkc-abc123"
})

print("Cluster: " .. (result.display_name or result.cluster_id))
print("Brokers: " .. (result.broker_count or "unknown"))
print("Controller: " .. (result.controller_id or "unknown"))
```

---

## list_producers

List producers for a specific Kafka topic.

### Parameters

| Name | Type | Required | Description |
|------|------|----------|-------------|
| `topic_name` | string | yes | The topic name to list producers for |
| `cluster_id` | string | no | Override the default Kafka cluster ID |

### Example

```lua
local result = app.integrations.kafka.list_producers({
  topic_name = "orders"
})

for _, producer in ipairs(result.data or {}) do
  print("Producer: " .. (producer.client_id or producer.producer_id))
end
```

---

## get_current_user

Get the currently authenticated Confluent Cloud user.

### Parameters

None.

### Example

```lua
local result = app.integrations.kafka.get_current_user({})

print("User: " .. (result.handle or "unknown"))
print("Name: " .. (result.full_name or "unknown"))
print("Email: " .. (result.email or "unknown"))
```

---

## Multi-Account Usage

If you have multiple Kafka accounts configured, use account-specific namespaces:

```lua
-- Default account (always works)
app.integrations.kafka.list_topics({})

-- Explicit default (portable across setups)
app.integrations.kafka.default.list_topics({})

-- Named accounts
app.integrations.kafka.production.list_topics({})
app.integrations.kafka.staging.list_topics({})
```

All functions are identical across accounts — only the credentials differ.

---

## Common Patterns

### Create a topic with production settings

```lua
local result = app.integrations.kafka.create_topic({
  topic_name = "user-events",
  partitions_count = 12,
  replication_factor = 3,
  configs = '{"retention.ms":"259200000","cleanup.policy":"compact,delete"}'
})

print("Created topic: " .. result.topic_name)
```

### List all topics and their partition counts

```lua
local result = app.integrations.kafka.list_topics({})

local topics = result.data or {}
print("Found " .. #topics .. " topics:")

for _, topic in ipairs(topics) do
  print("  - " .. topic.topic_name .. " (" .. topic.partitions_count .. " partitions)")
end
```

### Check cluster health and verify credentials

```lua
-- Verify credentials
local user = app.integrations.kafka.get_current_user({})
print("Connected as: " .. (user.full_name or user.handle))

-- Get cluster details
local cluster = app.integrations.kafka.get_cluster({})
print("Cluster: " .. (cluster.display_name or cluster.cluster_id))
print("Status: " .. (cluster.status or "unknown"))
```
Metadata-derived Lua example
local result = app.integrations.kafka.list_topics({cluster_id = "example_cluster_id"})
print(result)

Functions

list_topics Read

List Kafka topics in a cluster. Returns topic names, partition counts, replication factors, and status.

Lua path
app.integrations.kafka.list_topics
Full name
kafka.kafka_list_topics
ParameterTypeRequiredDescription
cluster_id string no Override the default Kafka cluster ID.
get_topic Read

Get full details of a specific Kafka topic by name. Returns partition count, replication factor, and topic configuration.

Lua path
app.integrations.kafka.get_topic
Full name
kafka.kafka_get_topic
ParameterTypeRequiredDescription
topic_name string yes The name of the topic to retrieve.
cluster_id string no Override the default Kafka cluster ID.
create_topic Write

Create a new Kafka topic in a cluster. Specify the topic name, partition count, and optional replication factor and configs.

Lua path
app.integrations.kafka.create_topic
Full name
kafka.kafka_create_topic
ParameterTypeRequiredDescription
topic_name string yes The name for the new topic.
partitions_count integer yes Number of partitions for the topic (e.g., 6).
replication_factor integer no Replication factor (e.g., 3 for production). Defaults to the cluster default.
configs object no JSON-encoded topic configs: retention.ms, cleanup.policy, etc.
cluster_id string no Override the default Kafka cluster ID.
list_clusters Read

List Kafka clusters in your Confluent Cloud environment. Returns cluster IDs, names, types, and status.

Lua path
app.integrations.kafka.list_clusters
Full name
kafka.kafka_list_clusters
ParameterTypeRequiredDescription
No parameters.
get_cluster Read

Get details of a specific Kafka cluster. Returns broker count, controller info, and cluster configuration.

Lua path
app.integrations.kafka.get_cluster
Full name
kafka.kafka_get_cluster
ParameterTypeRequiredDescription
cluster_id string no The cluster ID to retrieve. Uses the default cluster if not specified.
list_producers Read

List producers for a specific Kafka topic. Returns producer IDs, client IDs, and connection details.

Lua path
app.integrations.kafka.list_producers
Full name
kafka.kafka_list_producers
ParameterTypeRequiredDescription
topic_name string yes The topic name to list producers for.
cluster_id string no Override the default Kafka cluster ID.
get_current_user Read

Get the currently authenticated Confluent Cloud user. Useful for verifying credentials and identifying the connected account.

Lua path
app.integrations.kafka.get_current_user
Full name
kafka.kafka_get_current_user
ParameterTypeRequiredDescription
No parameters.