I have problem granting permissions to kafka confluent resource cluster using sarama. My code is below, when I run it it gives me an error:
error: kafka server: failed to create one or more ACL rules: kafka server: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details
When I give permissions to resource topic, group , transactionid, it works fine. It only fails in the resouce cluster. This is my code
<code>func CheckGrant() {
// struct containing acl for kafka
tempResource := sarama.Resource{
ResourceName: "test9",
ResourceType: sarama.AclResourceCluster,
ResourcePatternType: sarama.AclPatternLiteral,
}
// Creating struct final contain acl for kafka
Acl := sarama.Acl{
PermissionType: sarama.AclPermissionAllow,
Host: "*",
Principal: "User:test9",
Operation: sarama.AclOperationRead,
}
fmt.Println(tempResource)
fmt.Println(Acl)
// Connect to kafka
adminclient, err := kafka.AdminKafka(bootstrapServer, saslEnabled, saslUser, saslPassword, saslMechanism, saslProtocol)
if err != nil {
log.Fatalf("Failed to create admin client: %v", err)
}
defer adminclient.Close()
err = adminclient.CreateACL(tempResource, Acl)
if err != nil {
fmt.Println("error:", err)
}
}
</code>
<code>func CheckGrant() {
// struct containing acl for kafka
tempResource := sarama.Resource{
ResourceName: "test9",
ResourceType: sarama.AclResourceCluster,
ResourcePatternType: sarama.AclPatternLiteral,
}
// Creating struct final contain acl for kafka
Acl := sarama.Acl{
PermissionType: sarama.AclPermissionAllow,
Host: "*",
Principal: "User:test9",
Operation: sarama.AclOperationRead,
}
fmt.Println(tempResource)
fmt.Println(Acl)
// Connect to kafka
adminclient, err := kafka.AdminKafka(bootstrapServer, saslEnabled, saslUser, saslPassword, saslMechanism, saslProtocol)
if err != nil {
log.Fatalf("Failed to create admin client: %v", err)
}
defer adminclient.Close()
err = adminclient.CreateACL(tempResource, Acl)
if err != nil {
fmt.Println("error:", err)
}
}
</code>
func CheckGrant() {
// struct containing acl for kafka
tempResource := sarama.Resource{
ResourceName: "test9",
ResourceType: sarama.AclResourceCluster,
ResourcePatternType: sarama.AclPatternLiteral,
}
// Creating struct final contain acl for kafka
Acl := sarama.Acl{
PermissionType: sarama.AclPermissionAllow,
Host: "*",
Principal: "User:test9",
Operation: sarama.AclOperationRead,
}
fmt.Println(tempResource)
fmt.Println(Acl)
// Connect to kafka
adminclient, err := kafka.AdminKafka(bootstrapServer, saslEnabled, saslUser, saslPassword, saslMechanism, saslProtocol)
if err != nil {
log.Fatalf("Failed to create admin client: %v", err)
}
defer adminclient.Close()
err = adminclient.CreateACL(tempResource, Acl)
if err != nil {
fmt.Println("error:", err)
}
}
I still haven’t found the error, I checked the log in the broker, but nothing is displayed.