You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1614 lines
56 KiB
1614 lines
56 KiB
/** |
|
* Copyright 2018 Confluent Inc. |
|
* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
*/ |
|
|
|
package kafka |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"strings" |
|
"time" |
|
"unsafe" |
|
) |
|
|
|
/* |
|
#include "select_rdkafka.h" |
|
#include <stdlib.h> |
|
|
|
static const rd_kafka_topic_result_t * |
|
topic_result_by_idx (const rd_kafka_topic_result_t **topics, size_t cnt, size_t idx) { |
|
if (idx >= cnt) |
|
return NULL; |
|
return topics[idx]; |
|
} |
|
|
|
static const rd_kafka_ConfigResource_t * |
|
ConfigResource_by_idx (const rd_kafka_ConfigResource_t **res, size_t cnt, size_t idx) { |
|
if (idx >= cnt) |
|
return NULL; |
|
return res[idx]; |
|
} |
|
|
|
static const rd_kafka_ConfigEntry_t * |
|
ConfigEntry_by_idx (const rd_kafka_ConfigEntry_t **entries, size_t cnt, size_t idx) { |
|
if (idx >= cnt) |
|
return NULL; |
|
return entries[idx]; |
|
} |
|
|
|
static const rd_kafka_acl_result_t * |
|
acl_result_by_idx (const rd_kafka_acl_result_t **acl_results, size_t cnt, size_t idx) { |
|
if (idx >= cnt) |
|
return NULL; |
|
return acl_results[idx]; |
|
} |
|
|
|
static const rd_kafka_DeleteAcls_result_response_t * |
|
DeleteAcls_result_response_by_idx (const rd_kafka_DeleteAcls_result_response_t **delete_acls_result_responses, size_t cnt, size_t idx) { |
|
if (idx >= cnt) |
|
return NULL; |
|
return delete_acls_result_responses[idx]; |
|
} |
|
|
|
static const rd_kafka_AclBinding_t * |
|
AclBinding_by_idx (const rd_kafka_AclBinding_t **acl_bindings, size_t cnt, size_t idx) { |
|
if (idx >= cnt) |
|
return NULL; |
|
return acl_bindings[idx]; |
|
} |
|
*/ |
|
import "C" |
|
|
|
// AdminClient is derived from an existing Producer or Consumer |
|
type AdminClient struct { |
|
handle *handle |
|
isDerived bool // Derived from existing client handle |
|
} |
|
|
|
func durationToMilliseconds(t time.Duration) int { |
|
if t > 0 { |
|
return (int)(t.Seconds() * 1000.0) |
|
} |
|
return (int)(t) |
|
} |
|
|
|
// TopicResult provides per-topic operation result (error) information. |
|
type TopicResult struct { |
|
// Topic name |
|
Topic string |
|
// Error, if any, of result. Check with `Error.Code() != ErrNoError`. |
|
Error Error |
|
} |
|
|
|
// String returns a human-readable representation of a TopicResult. |
|
func (t TopicResult) String() string { |
|
if t.Error.code == 0 { |
|
return t.Topic |
|
} |
|
return fmt.Sprintf("%s (%s)", t.Topic, t.Error.str) |
|
} |
|
|
|
// TopicSpecification holds parameters for creating a new topic. |
|
// TopicSpecification is analogous to NewTopic in the Java Topic Admin API. |
|
type TopicSpecification struct { |
|
// Topic name to create. |
|
Topic string |
|
// Number of partitions in topic. |
|
NumPartitions int |
|
// Default replication factor for the topic's partitions, or zero |
|
// if an explicit ReplicaAssignment is set. |
|
ReplicationFactor int |
|
// (Optional) Explicit replica assignment. The outer array is |
|
// indexed by the partition number, while the inner per-partition array |
|
// contains the replica broker ids. The first broker in each |
|
// broker id list will be the preferred replica. |
|
ReplicaAssignment [][]int32 |
|
// Topic configuration. |
|
Config map[string]string |
|
} |
|
|
|
// PartitionsSpecification holds parameters for creating additional partitions for a topic. |
|
// PartitionsSpecification is analogous to NewPartitions in the Java Topic Admin API. |
|
type PartitionsSpecification struct { |
|
// Topic to create more partitions for. |
|
Topic string |
|
// New partition count for topic, must be higher than current partition count. |
|
IncreaseTo int |
|
// (Optional) Explicit replica assignment. The outer array is |
|
// indexed by the new partition index (i.e., 0 for the first added |
|
// partition), while the inner per-partition array |
|
// contains the replica broker ids. The first broker in each |
|
// broker id list will be the preferred replica. |
|
ReplicaAssignment [][]int32 |
|
} |
|
|
|
// ResourceType represents an Apache Kafka resource type |
|
type ResourceType int |
|
|
|
const ( |
|
// ResourceUnknown - Unknown |
|
ResourceUnknown = ResourceType(C.RD_KAFKA_RESOURCE_UNKNOWN) |
|
// ResourceAny - match any resource type (DescribeConfigs) |
|
ResourceAny = ResourceType(C.RD_KAFKA_RESOURCE_ANY) |
|
// ResourceTopic - Topic |
|
ResourceTopic = ResourceType(C.RD_KAFKA_RESOURCE_TOPIC) |
|
// ResourceGroup - Group |
|
ResourceGroup = ResourceType(C.RD_KAFKA_RESOURCE_GROUP) |
|
// ResourceBroker - Broker |
|
ResourceBroker = ResourceType(C.RD_KAFKA_RESOURCE_BROKER) |
|
) |
|
|
|
// String returns the human-readable representation of a ResourceType |
|
func (t ResourceType) String() string { |
|
return C.GoString(C.rd_kafka_ResourceType_name(C.rd_kafka_ResourceType_t(t))) |
|
} |
|
|
|
// ResourceTypeFromString translates a resource type name/string to |
|
// a ResourceType value. |
|
func ResourceTypeFromString(typeString string) (ResourceType, error) { |
|
switch strings.ToUpper(typeString) { |
|
case "ANY": |
|
return ResourceAny, nil |
|
case "TOPIC": |
|
return ResourceTopic, nil |
|
case "GROUP": |
|
return ResourceGroup, nil |
|
case "BROKER": |
|
return ResourceBroker, nil |
|
default: |
|
return ResourceUnknown, NewError(ErrInvalidArg, "Unknown resource type", false) |
|
} |
|
} |
|
|
|
// ConfigSource represents an Apache Kafka config source |
|
type ConfigSource int |
|
|
|
const ( |
|
// ConfigSourceUnknown is the default value |
|
ConfigSourceUnknown = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG) |
|
// ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic |
|
ConfigSourceDynamicTopic = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG) |
|
// ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker |
|
ConfigSourceDynamicBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG) |
|
// ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster |
|
ConfigSourceDynamicDefaultBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG) |
|
// ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file) |
|
ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG) |
|
// ConfigSourceDefault is built-in default configuration for configs that have a default value |
|
ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG) |
|
) |
|
|
|
// String returns the human-readable representation of a ConfigSource type |
|
func (t ConfigSource) String() string { |
|
return C.GoString(C.rd_kafka_ConfigSource_name(C.rd_kafka_ConfigSource_t(t))) |
|
} |
|
|
|
// ConfigResource holds parameters for altering an Apache Kafka configuration resource |
|
type ConfigResource struct { |
|
// Type of resource to set. |
|
Type ResourceType |
|
// Name of resource to set. |
|
Name string |
|
// Config entries to set. |
|
// Configuration updates are atomic, any configuration property not provided |
|
// here will be reverted (by the broker) to its default value. |
|
// Use DescribeConfigs to retrieve the list of current configuration entry values. |
|
Config []ConfigEntry |
|
} |
|
|
|
// String returns a human-readable representation of a ConfigResource |
|
func (c ConfigResource) String() string { |
|
return fmt.Sprintf("Resource(%s, %s)", c.Type, c.Name) |
|
} |
|
|
|
// AlterOperation specifies the operation to perform on the ConfigEntry. |
|
// Currently only AlterOperationSet. |
|
type AlterOperation int |
|
|
|
const ( |
|
// AlterOperationSet sets/overwrites the configuration setting. |
|
AlterOperationSet = iota |
|
) |
|
|
|
// String returns the human-readable representation of an AlterOperation |
|
func (o AlterOperation) String() string { |
|
switch o { |
|
case AlterOperationSet: |
|
return "Set" |
|
default: |
|
return fmt.Sprintf("Unknown%d?", int(o)) |
|
} |
|
} |
|
|
|
// ConfigEntry holds parameters for altering a resource's configuration. |
|
type ConfigEntry struct { |
|
// Name of configuration entry, e.g., topic configuration property name. |
|
Name string |
|
// Value of configuration entry. |
|
Value string |
|
// Operation to perform on the entry. |
|
Operation AlterOperation |
|
} |
|
|
|
// StringMapToConfigEntries creates a new map of ConfigEntry objects from the |
|
// provided string map. The AlterOperation is set on each created entry. |
|
func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry { |
|
var ceList []ConfigEntry |
|
|
|
for k, v := range stringMap { |
|
ceList = append(ceList, ConfigEntry{Name: k, Value: v, Operation: operation}) |
|
} |
|
|
|
return ceList |
|
} |
|
|
|
// String returns a human-readable representation of a ConfigEntry. |
|
func (c ConfigEntry) String() string { |
|
return fmt.Sprintf("%v %s=\"%s\"", c.Operation, c.Name, c.Value) |
|
} |
|
|
|
// ConfigEntryResult contains the result of a single configuration entry from a |
|
// DescribeConfigs request. |
|
type ConfigEntryResult struct { |
|
// Name of configuration entry, e.g., topic configuration property name. |
|
Name string |
|
// Value of configuration entry. |
|
Value string |
|
// Source indicates the configuration source. |
|
Source ConfigSource |
|
// IsReadOnly indicates whether the configuration entry can be altered. |
|
IsReadOnly bool |
|
// IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset. |
|
IsSensitive bool |
|
// IsSynonym indicates whether the configuration entry is a synonym for another configuration property. |
|
IsSynonym bool |
|
// Synonyms contains a map of configuration entries that are synonyms to this configuration entry. |
|
Synonyms map[string]ConfigEntryResult |
|
} |
|
|
|
// String returns a human-readable representation of a ConfigEntryResult. |
|
func (c ConfigEntryResult) String() string { |
|
return fmt.Sprintf("%s=\"%s\"", c.Name, c.Value) |
|
} |
|
|
|
// setFromC sets up a ConfigEntryResult from a C ConfigEntry |
|
func configEntryResultFromC(cEntry *C.rd_kafka_ConfigEntry_t) (entry ConfigEntryResult) { |
|
entry.Name = C.GoString(C.rd_kafka_ConfigEntry_name(cEntry)) |
|
cValue := C.rd_kafka_ConfigEntry_value(cEntry) |
|
if cValue != nil { |
|
entry.Value = C.GoString(cValue) |
|
} |
|
entry.Source = ConfigSource(C.rd_kafka_ConfigEntry_source(cEntry)) |
|
entry.IsReadOnly = cint2bool(C.rd_kafka_ConfigEntry_is_read_only(cEntry)) |
|
entry.IsSensitive = cint2bool(C.rd_kafka_ConfigEntry_is_sensitive(cEntry)) |
|
entry.IsSynonym = cint2bool(C.rd_kafka_ConfigEntry_is_synonym(cEntry)) |
|
|
|
var cSynCnt C.size_t |
|
cSyns := C.rd_kafka_ConfigEntry_synonyms(cEntry, &cSynCnt) |
|
if cSynCnt > 0 { |
|
entry.Synonyms = make(map[string]ConfigEntryResult) |
|
} |
|
|
|
for si := 0; si < int(cSynCnt); si++ { |
|
cSyn := C.ConfigEntry_by_idx(cSyns, cSynCnt, C.size_t(si)) |
|
Syn := configEntryResultFromC(cSyn) |
|
entry.Synonyms[Syn.Name] = Syn |
|
} |
|
|
|
return entry |
|
} |
|
|
|
// ConfigResourceResult provides the result for a resource from a AlterConfigs or |
|
// DescribeConfigs request. |
|
type ConfigResourceResult struct { |
|
// Type of returned result resource. |
|
Type ResourceType |
|
// Name of returned result resource. |
|
Name string |
|
// Error, if any, of returned result resource. |
|
Error Error |
|
// Config entries, if any, of returned result resource. |
|
Config map[string]ConfigEntryResult |
|
} |
|
|
|
// String returns a human-readable representation of a ConfigResourceResult. |
|
func (c ConfigResourceResult) String() string { |
|
if c.Error.Code() != 0 { |
|
return fmt.Sprintf("ResourceResult(%s, %s, \"%v\")", c.Type, c.Name, c.Error) |
|
|
|
} |
|
return fmt.Sprintf("ResourceResult(%s, %s, %d config(s))", c.Type, c.Name, len(c.Config)) |
|
} |
|
|
|
// ResourcePatternType enumerates the different types of Kafka resource patterns. |
|
type ResourcePatternType int |
|
|
|
const ( |
|
// ResourcePatternTypeUnknown is a resource pattern type not known or not set. |
|
ResourcePatternTypeUnknown = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_UNKNOWN) |
|
// ResourcePatternTypeAny matches any resource, used for lookups. |
|
ResourcePatternTypeAny = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_ANY) |
|
// ResourcePatternTypeMatch will perform pattern matching |
|
ResourcePatternTypeMatch = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_MATCH) |
|
// ResourcePatternTypeLiteral matches a literal resource name |
|
ResourcePatternTypeLiteral = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_LITERAL) |
|
// ResourcePatternTypePrefixed matches a prefixed resource name |
|
ResourcePatternTypePrefixed = ResourcePatternType(C.RD_KAFKA_RESOURCE_PATTERN_PREFIXED) |
|
) |
|
|
|
// String returns the human-readable representation of a ResourcePatternType |
|
func (t ResourcePatternType) String() string { |
|
return C.GoString(C.rd_kafka_ResourcePatternType_name(C.rd_kafka_ResourcePatternType_t(t))) |
|
} |
|
|
|
// ResourcePatternTypeFromString translates a resource pattern type name to |
|
// a ResourcePatternType value. |
|
func ResourcePatternTypeFromString(patternTypeString string) (ResourcePatternType, error) { |
|
switch strings.ToUpper(patternTypeString) { |
|
case "ANY": |
|
return ResourcePatternTypeAny, nil |
|
case "MATCH": |
|
return ResourcePatternTypeMatch, nil |
|
case "LITERAL": |
|
return ResourcePatternTypeLiteral, nil |
|
case "PREFIXED": |
|
return ResourcePatternTypePrefixed, nil |
|
default: |
|
return ResourcePatternTypeUnknown, NewError(ErrInvalidArg, "Unknown resource pattern type", false) |
|
} |
|
} |
|
|
|
// ACLOperation enumerates the different types of ACL operation. |
|
type ACLOperation int |
|
|
|
const ( |
|
// ACLOperationUnknown represents an unknown or unset operation |
|
ACLOperationUnknown = ACLOperation(C.RD_KAFKA_ACL_OPERATION_UNKNOWN) |
|
// ACLOperationAny in a filter, matches any ACLOperation |
|
ACLOperationAny = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ANY) |
|
// ACLOperationAll represents all the operations |
|
ACLOperationAll = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALL) |
|
// ACLOperationRead a read operation |
|
ACLOperationRead = ACLOperation(C.RD_KAFKA_ACL_OPERATION_READ) |
|
// ACLOperationWrite represents a write operation |
|
ACLOperationWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_WRITE) |
|
// ACLOperationCreate represents a create operation |
|
ACLOperationCreate = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CREATE) |
|
// ACLOperationDelete represents a delete operation |
|
ACLOperationDelete = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DELETE) |
|
// ACLOperationAlter represents an alter operation |
|
ACLOperationAlter = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER) |
|
// ACLOperationDescribe represents a describe operation |
|
ACLOperationDescribe = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE) |
|
// ACLOperationClusterAction represents a cluster action operation |
|
ACLOperationClusterAction = ACLOperation(C.RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION) |
|
// ACLOperationDescribeConfigs represents a describe configs operation |
|
ACLOperationDescribeConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS) |
|
// ACLOperationAlterConfigs represents an alter configs operation |
|
ACLOperationAlterConfigs = ACLOperation(C.RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS) |
|
// ACLOperationIdempotentWrite represents an idempotent write operation |
|
ACLOperationIdempotentWrite = ACLOperation(C.RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE) |
|
) |
|
|
|
// String returns the human-readable representation of an ACLOperation |
|
func (o ACLOperation) String() string { |
|
return C.GoString(C.rd_kafka_AclOperation_name(C.rd_kafka_AclOperation_t(o))) |
|
} |
|
|
|
// ACLOperationFromString translates a ACL operation name to |
|
// a ACLOperation value. |
|
func ACLOperationFromString(aclOperationString string) (ACLOperation, error) { |
|
switch strings.ToUpper(aclOperationString) { |
|
case "ANY": |
|
return ACLOperationAny, nil |
|
case "ALL": |
|
return ACLOperationAll, nil |
|
case "READ": |
|
return ACLOperationRead, nil |
|
case "WRITE": |
|
return ACLOperationWrite, nil |
|
case "CREATE": |
|
return ACLOperationCreate, nil |
|
case "DELETE": |
|
return ACLOperationDelete, nil |
|
case "ALTER": |
|
return ACLOperationAlter, nil |
|
case "DESCRIBE": |
|
return ACLOperationDescribe, nil |
|
case "CLUSTER_ACTION": |
|
return ACLOperationClusterAction, nil |
|
case "DESCRIBE_CONFIGS": |
|
return ACLOperationDescribeConfigs, nil |
|
case "ALTER_CONFIGS": |
|
return ACLOperationAlterConfigs, nil |
|
case "IDEMPOTENT_WRITE": |
|
return ACLOperationIdempotentWrite, nil |
|
default: |
|
return ACLOperationUnknown, NewError(ErrInvalidArg, "Unknown ACL operation", false) |
|
} |
|
} |
|
|
|
// ACLPermissionType enumerates the different types of ACL permission types. |
|
type ACLPermissionType int |
|
|
|
const ( |
|
// ACLPermissionTypeUnknown represents an unknown ACLPermissionType |
|
ACLPermissionTypeUnknown = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN) |
|
// ACLPermissionTypeAny in a filter, matches any ACLPermissionType |
|
ACLPermissionTypeAny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ANY) |
|
// ACLPermissionTypeDeny disallows access |
|
ACLPermissionTypeDeny = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_DENY) |
|
// ACLPermissionTypeAllow grants access |
|
ACLPermissionTypeAllow = ACLPermissionType(C.RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW) |
|
) |
|
|
|
// String returns the human-readable representation of an ACLPermissionType |
|
func (o ACLPermissionType) String() string { |
|
return C.GoString(C.rd_kafka_AclPermissionType_name(C.rd_kafka_AclPermissionType_t(o))) |
|
} |
|
|
|
// ACLPermissionTypeFromString translates a ACL permission type name to |
|
// a ACLPermissionType value. |
|
func ACLPermissionTypeFromString(aclPermissionTypeString string) (ACLPermissionType, error) { |
|
switch strings.ToUpper(aclPermissionTypeString) { |
|
case "ANY": |
|
return ACLPermissionTypeAny, nil |
|
case "DENY": |
|
return ACLPermissionTypeDeny, nil |
|
case "ALLOW": |
|
return ACLPermissionTypeAllow, nil |
|
default: |
|
return ACLPermissionTypeUnknown, NewError(ErrInvalidArg, "Unknown ACL permission type", false) |
|
} |
|
} |
|
|
|
// ACLBinding specifies the operation and permission type for a specific principal |
|
// over one or more resources of the same type. Used by `AdminClient.CreateACLs`, |
|
// returned by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`. |
|
type ACLBinding struct { |
|
Type ResourceType // The resource type. |
|
// The resource name, which depends on the resource type. |
|
// For ResourceBroker the resource name is the broker id. |
|
Name string |
|
ResourcePatternType ResourcePatternType // The resource pattern, relative to the name. |
|
Principal string // The principal this ACLBinding refers to. |
|
Host string // The host that the call is allowed to come from. |
|
Operation ACLOperation // The operation/s specified by this binding. |
|
PermissionType ACLPermissionType // The permission type for the specified operation. |
|
} |
|
|
|
// ACLBindingFilter specifies a filter used to return a list of ACL bindings matching some or all of its attributes. |
|
// Used by `AdminClient.DescribeACLs` and `AdminClient.DeleteACLs`. |
|
type ACLBindingFilter = ACLBinding |
|
|
|
// ACLBindings is a slice of ACLBinding that also implements |
|
// the sort interface |
|
type ACLBindings []ACLBinding |
|
|
|
// ACLBindingFilters is a slice of ACLBindingFilter that also implements |
|
// the sort interface |
|
type ACLBindingFilters []ACLBindingFilter |
|
|
|
func (a ACLBindings) Len() int { |
|
return len(a) |
|
} |
|
|
|
func (a ACLBindings) Less(i, j int) bool { |
|
if a[i].Type != a[j].Type { |
|
return a[i].Type < a[j].Type |
|
} |
|
if a[i].Name != a[j].Name { |
|
return a[i].Name < a[j].Name |
|
} |
|
if a[i].ResourcePatternType != a[j].ResourcePatternType { |
|
return a[i].ResourcePatternType < a[j].ResourcePatternType |
|
} |
|
if a[i].Principal != a[j].Principal { |
|
return a[i].Principal < a[j].Principal |
|
} |
|
if a[i].Host != a[j].Host { |
|
return a[i].Host < a[j].Host |
|
} |
|
if a[i].Operation != a[j].Operation { |
|
return a[i].Operation < a[j].Operation |
|
} |
|
if a[i].PermissionType != a[j].PermissionType { |
|
return a[i].PermissionType < a[j].PermissionType |
|
} |
|
return true |
|
} |
|
|
|
func (a ACLBindings) Swap(i, j int) { |
|
a[i], a[j] = a[j], a[i] |
|
} |
|
|
|
// CreateACLResult provides create ACL error information. |
|
type CreateACLResult struct { |
|
// Error, if any, of result. Check with `Error.Code() != ErrNoError`. |
|
Error Error |
|
} |
|
|
|
// DescribeACLsResult provides describe ACLs result or error information. |
|
type DescribeACLsResult struct { |
|
// Slice of ACL bindings matching the provided filter |
|
ACLBindings ACLBindings |
|
// Error, if any, of result. Check with `Error.Code() != ErrNoError`. |
|
Error Error |
|
} |
|
|
|
// DeleteACLsResult provides delete ACLs result or error information. |
|
type DeleteACLsResult = DescribeACLsResult |
|
|
|
// waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens |
|
// first. |
|
// The returned result event is checked for errors its error is returned if set. |
|
func (a *AdminClient) waitResult(ctx context.Context, cQueue *C.rd_kafka_queue_t, cEventType C.rd_kafka_event_type_t) (rkev *C.rd_kafka_event_t, err error) { |
|
|
|
resultChan := make(chan *C.rd_kafka_event_t) |
|
closeChan := make(chan bool) // never written to, just closed |
|
|
|
go func() { |
|
for { |
|
select { |
|
case _, ok := <-closeChan: |
|
if !ok { |
|
// Context cancelled/timed out |
|
close(resultChan) |
|
return |
|
} |
|
|
|
default: |
|
// Wait for result event for at most 50ms |
|
// to avoid blocking for too long if |
|
// context is cancelled. |
|
rkev := C.rd_kafka_queue_poll(cQueue, 50) |
|
if rkev != nil { |
|
resultChan <- rkev |
|
close(resultChan) |
|
return |
|
} |
|
} |
|
} |
|
}() |
|
|
|
select { |
|
case rkev = <-resultChan: |
|
// Result type check |
|
if cEventType != C.rd_kafka_event_type(rkev) { |
|
err = newErrorFromString(ErrInvalidType, |
|
fmt.Sprintf("Expected %d result event, not %d", (int)(cEventType), (int)(C.rd_kafka_event_type(rkev)))) |
|
C.rd_kafka_event_destroy(rkev) |
|
return nil, err |
|
} |
|
|
|
// Generic error handling |
|
cErr := C.rd_kafka_event_error(rkev) |
|
if cErr != 0 { |
|
err = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev)) |
|
C.rd_kafka_event_destroy(rkev) |
|
return nil, err |
|
} |
|
close(closeChan) |
|
return rkev, nil |
|
case <-ctx.Done(): |
|
// signal close to go-routine |
|
close(closeChan) |
|
// wait for close from go-routine to make sure it is done |
|
// using cQueue before we return. |
|
rkev, ok := <-resultChan |
|
if ok { |
|
// throw away result since context was cancelled |
|
C.rd_kafka_event_destroy(rkev) |
|
} |
|
return nil, ctx.Err() |
|
} |
|
} |
|
|
|
// cToTopicResults converts a C topic_result_t array to Go TopicResult list. |
|
func (a *AdminClient) cToTopicResults(cTopicRes **C.rd_kafka_topic_result_t, cCnt C.size_t) (result []TopicResult, err error) { |
|
|
|
result = make([]TopicResult, int(cCnt)) |
|
|
|
for i := 0; i < int(cCnt); i++ { |
|
cTopic := C.topic_result_by_idx(cTopicRes, cCnt, C.size_t(i)) |
|
result[i].Topic = C.GoString(C.rd_kafka_topic_result_name(cTopic)) |
|
result[i].Error = newErrorFromCString( |
|
C.rd_kafka_topic_result_error(cTopic), |
|
C.rd_kafka_topic_result_error_string(cTopic)) |
|
} |
|
|
|
return result, nil |
|
} |
|
|
|
// cConfigResourceToResult converts a C ConfigResource result array to Go ConfigResourceResult |
|
func (a *AdminClient) cConfigResourceToResult(cRes **C.rd_kafka_ConfigResource_t, cCnt C.size_t) (result []ConfigResourceResult, err error) { |
|
|
|
result = make([]ConfigResourceResult, int(cCnt)) |
|
|
|
for i := 0; i < int(cCnt); i++ { |
|
cRes := C.ConfigResource_by_idx(cRes, cCnt, C.size_t(i)) |
|
result[i].Type = ResourceType(C.rd_kafka_ConfigResource_type(cRes)) |
|
result[i].Name = C.GoString(C.rd_kafka_ConfigResource_name(cRes)) |
|
result[i].Error = newErrorFromCString( |
|
C.rd_kafka_ConfigResource_error(cRes), |
|
C.rd_kafka_ConfigResource_error_string(cRes)) |
|
var cConfigCnt C.size_t |
|
cConfigs := C.rd_kafka_ConfigResource_configs(cRes, &cConfigCnt) |
|
if cConfigCnt > 0 { |
|
result[i].Config = make(map[string]ConfigEntryResult) |
|
} |
|
for ci := 0; ci < int(cConfigCnt); ci++ { |
|
cEntry := C.ConfigEntry_by_idx(cConfigs, cConfigCnt, C.size_t(ci)) |
|
entry := configEntryResultFromC(cEntry) |
|
result[i].Config[entry.Name] = entry |
|
} |
|
} |
|
|
|
return result, nil |
|
} |
|
|
|
// ClusterID returns the cluster ID as reported in broker metadata. |
|
// |
|
// Note on cancellation: Although the underlying C function respects the |
|
// timeout, it currently cannot be manually cancelled. That means manually |
|
// cancelling the context will block until the C function call returns. |
|
// |
|
// Requires broker version >= 0.10.0. |
|
func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error) { |
|
responseChan := make(chan *C.char, 1) |
|
|
|
go func() { |
|
responseChan <- C.rd_kafka_clusterid(a.handle.rk, cTimeoutFromContext(ctx)) |
|
}() |
|
|
|
select { |
|
case <-ctx.Done(): |
|
if cClusterID := <-responseChan; cClusterID != nil { |
|
C.rd_kafka_mem_free(a.handle.rk, unsafe.Pointer(cClusterID)) |
|
} |
|
return "", ctx.Err() |
|
|
|
case cClusterID := <-responseChan: |
|
if cClusterID == nil { // C timeout |
|
<-ctx.Done() |
|
return "", ctx.Err() |
|
} |
|
defer C.rd_kafka_mem_free(a.handle.rk, unsafe.Pointer(cClusterID)) |
|
return C.GoString(cClusterID), nil |
|
} |
|
} |
|
|
|
// ControllerID returns the broker ID of the current controller as reported in |
|
// broker metadata. |
|
// |
|
// Note on cancellation: Although the underlying C function respects the |
|
// timeout, it currently cannot be manually cancelled. That means manually |
|
// cancelling the context will block until the C function call returns. |
|
// |
|
// Requires broker version >= 0.10.0. |
|
func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error) { |
|
responseChan := make(chan int32, 1) |
|
|
|
go func() { |
|
responseChan <- int32(C.rd_kafka_controllerid(a.handle.rk, cTimeoutFromContext(ctx))) |
|
}() |
|
|
|
select { |
|
case <-ctx.Done(): |
|
<-responseChan |
|
return 0, ctx.Err() |
|
|
|
case controllerID := <-responseChan: |
|
if controllerID < 0 { // C timeout |
|
<-ctx.Done() |
|
return 0, ctx.Err() |
|
} |
|
return controllerID, nil |
|
} |
|
} |
|
|
|
// CreateTopics creates topics in cluster. |
|
// |
|
// The list of TopicSpecification objects define the per-topic partition count, replicas, etc. |
|
// |
|
// Topic creation is non-atomic and may succeed for some topics but fail for others, |
|
// make sure to check the result for topic-specific errors. |
|
// |
|
// Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API. |
|
func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error) { |
|
cTopics := make([]*C.rd_kafka_NewTopic_t, len(topics)) |
|
|
|
cErrstrSize := C.size_t(512) |
|
cErrstr := (*C.char)(C.malloc(cErrstrSize)) |
|
defer C.free(unsafe.Pointer(cErrstr)) |
|
|
|
// Convert Go TopicSpecifications to C TopicSpecifications |
|
for i, topic := range topics { |
|
|
|
var cReplicationFactor C.int |
|
if topic.ReplicationFactor == 0 { |
|
cReplicationFactor = -1 |
|
} else { |
|
cReplicationFactor = C.int(topic.ReplicationFactor) |
|
} |
|
if topic.ReplicaAssignment != nil { |
|
if cReplicationFactor != -1 { |
|
return nil, newErrorFromString(ErrInvalidArg, |
|
"TopicSpecification.ReplicationFactor and TopicSpecification.ReplicaAssignment are mutually exclusive") |
|
} |
|
|
|
if len(topic.ReplicaAssignment) != topic.NumPartitions { |
|
return nil, newErrorFromString(ErrInvalidArg, |
|
"TopicSpecification.ReplicaAssignment must contain exactly TopicSpecification.NumPartitions partitions") |
|
} |
|
} |
|
|
|
cTopics[i] = C.rd_kafka_NewTopic_new( |
|
C.CString(topic.Topic), |
|
C.int(topic.NumPartitions), |
|
cReplicationFactor, |
|
cErrstr, cErrstrSize) |
|
if cTopics[i] == nil { |
|
return nil, newErrorFromString(ErrInvalidArg, |
|
fmt.Sprintf("Topic %s: %s", topic.Topic, C.GoString(cErrstr))) |
|
} |
|
|
|
defer C.rd_kafka_NewTopic_destroy(cTopics[i]) |
|
|
|
for p, replicas := range topic.ReplicaAssignment { |
|
cReplicas := make([]C.int32_t, len(replicas)) |
|
for ri, replica := range replicas { |
|
cReplicas[ri] = C.int32_t(replica) |
|
} |
|
cErr := C.rd_kafka_NewTopic_set_replica_assignment( |
|
cTopics[i], C.int32_t(p), |
|
(*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)), |
|
cErrstr, cErrstrSize) |
|
if cErr != 0 { |
|
return nil, newCErrorFromString(cErr, |
|
fmt.Sprintf("Failed to set replica assignment for topic %s partition %d: %s", topic.Topic, p, C.GoString(cErrstr))) |
|
} |
|
} |
|
|
|
for key, value := range topic.Config { |
|
cErr := C.rd_kafka_NewTopic_set_config( |
|
cTopics[i], |
|
C.CString(key), C.CString(value)) |
|
if cErr != 0 { |
|
return nil, newCErrorFromString(cErr, |
|
fmt.Sprintf("Failed to set config %s=%s for topic %s", key, value, topic.Topic)) |
|
} |
|
} |
|
} |
|
|
|
// Convert Go AdminOptions (if any) to C AdminOptions |
|
genericOptions := make([]AdminOption, len(options)) |
|
for i := range options { |
|
genericOptions[i] = options[i] |
|
} |
|
cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATETOPICS, genericOptions) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_AdminOptions_destroy(cOptions) |
|
|
|
// Create temporary queue for async operation |
|
cQueue := C.rd_kafka_queue_new(a.handle.rk) |
|
defer C.rd_kafka_queue_destroy(cQueue) |
|
|
|
// Asynchronous call |
|
C.rd_kafka_CreateTopics( |
|
a.handle.rk, |
|
(**C.rd_kafka_NewTopic_t)(&cTopics[0]), |
|
C.size_t(len(cTopics)), |
|
cOptions, |
|
cQueue) |
|
|
|
// Wait for result, error or context timeout |
|
rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATETOPICS_RESULT) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_event_destroy(rkev) |
|
|
|
cRes := C.rd_kafka_event_CreateTopics_result(rkev) |
|
|
|
// Convert result from C to Go |
|
var cCnt C.size_t |
|
cTopicRes := C.rd_kafka_CreateTopics_result_topics(cRes, &cCnt) |
|
|
|
return a.cToTopicResults(cTopicRes, cCnt) |
|
} |
|
|
|
// DeleteTopics deletes a batch of topics. |
|
// |
|
// This operation is not transactional and may succeed for a subset of topics while |
|
// failing others. |
|
// It may take several seconds after the DeleteTopics result returns success for |
|
// all the brokers to become aware that the topics are gone. During this time, |
|
// topic metadata and configuration may continue to return information about deleted topics. |
|
// |
|
// Requires broker version >= 0.10.1.0 |
|
func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error) { |
|
cTopics := make([]*C.rd_kafka_DeleteTopic_t, len(topics)) |
|
|
|
cErrstrSize := C.size_t(512) |
|
cErrstr := (*C.char)(C.malloc(cErrstrSize)) |
|
defer C.free(unsafe.Pointer(cErrstr)) |
|
|
|
// Convert Go DeleteTopics to C DeleteTopics |
|
for i, topic := range topics { |
|
cTopics[i] = C.rd_kafka_DeleteTopic_new(C.CString(topic)) |
|
if cTopics[i] == nil { |
|
return nil, newErrorFromString(ErrInvalidArg, |
|
fmt.Sprintf("Invalid arguments for topic %s", topic)) |
|
} |
|
|
|
defer C.rd_kafka_DeleteTopic_destroy(cTopics[i]) |
|
} |
|
|
|
// Convert Go AdminOptions (if any) to C AdminOptions |
|
genericOptions := make([]AdminOption, len(options)) |
|
for i := range options { |
|
genericOptions[i] = options[i] |
|
} |
|
cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DELETETOPICS, genericOptions) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_AdminOptions_destroy(cOptions) |
|
|
|
// Create temporary queue for async operation |
|
cQueue := C.rd_kafka_queue_new(a.handle.rk) |
|
defer C.rd_kafka_queue_destroy(cQueue) |
|
|
|
// Asynchronous call |
|
C.rd_kafka_DeleteTopics( |
|
a.handle.rk, |
|
(**C.rd_kafka_DeleteTopic_t)(&cTopics[0]), |
|
C.size_t(len(cTopics)), |
|
cOptions, |
|
cQueue) |
|
|
|
// Wait for result, error or context timeout |
|
rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DELETETOPICS_RESULT) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_event_destroy(rkev) |
|
|
|
cRes := C.rd_kafka_event_DeleteTopics_result(rkev) |
|
|
|
// Convert result from C to Go |
|
var cCnt C.size_t |
|
cTopicRes := C.rd_kafka_DeleteTopics_result_topics(cRes, &cCnt) |
|
|
|
return a.cToTopicResults(cTopicRes, cCnt) |
|
} |
|
|
|
// CreatePartitions creates additional partitions for topics. |
|
func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error) { |
|
cParts := make([]*C.rd_kafka_NewPartitions_t, len(partitions)) |
|
|
|
cErrstrSize := C.size_t(512) |
|
cErrstr := (*C.char)(C.malloc(cErrstrSize)) |
|
defer C.free(unsafe.Pointer(cErrstr)) |
|
|
|
// Convert Go PartitionsSpecification to C NewPartitions |
|
for i, part := range partitions { |
|
cParts[i] = C.rd_kafka_NewPartitions_new(C.CString(part.Topic), C.size_t(part.IncreaseTo), cErrstr, cErrstrSize) |
|
if cParts[i] == nil { |
|
return nil, newErrorFromString(ErrInvalidArg, |
|
fmt.Sprintf("Topic %s: %s", part.Topic, C.GoString(cErrstr))) |
|
} |
|
|
|
defer C.rd_kafka_NewPartitions_destroy(cParts[i]) |
|
|
|
for pidx, replicas := range part.ReplicaAssignment { |
|
cReplicas := make([]C.int32_t, len(replicas)) |
|
for ri, replica := range replicas { |
|
cReplicas[ri] = C.int32_t(replica) |
|
} |
|
cErr := C.rd_kafka_NewPartitions_set_replica_assignment( |
|
cParts[i], C.int32_t(pidx), |
|
(*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)), |
|
cErrstr, cErrstrSize) |
|
if cErr != 0 { |
|
return nil, newCErrorFromString(cErr, |
|
fmt.Sprintf("Failed to set replica assignment for topic %s new partition index %d: %s", part.Topic, pidx, C.GoString(cErrstr))) |
|
} |
|
} |
|
|
|
} |
|
|
|
// Convert Go AdminOptions (if any) to C AdminOptions |
|
genericOptions := make([]AdminOption, len(options)) |
|
for i := range options { |
|
genericOptions[i] = options[i] |
|
} |
|
cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, genericOptions) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_AdminOptions_destroy(cOptions) |
|
|
|
// Create temporary queue for async operation |
|
cQueue := C.rd_kafka_queue_new(a.handle.rk) |
|
defer C.rd_kafka_queue_destroy(cQueue) |
|
|
|
// Asynchronous call |
|
C.rd_kafka_CreatePartitions( |
|
a.handle.rk, |
|
(**C.rd_kafka_NewPartitions_t)(&cParts[0]), |
|
C.size_t(len(cParts)), |
|
cOptions, |
|
cQueue) |
|
|
|
// Wait for result, error or context timeout |
|
rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_event_destroy(rkev) |
|
|
|
cRes := C.rd_kafka_event_CreatePartitions_result(rkev) |
|
|
|
// Convert result from C to Go |
|
var cCnt C.size_t |
|
cTopicRes := C.rd_kafka_CreatePartitions_result_topics(cRes, &cCnt) |
|
|
|
return a.cToTopicResults(cTopicRes, cCnt) |
|
} |
|
|
|
// AlterConfigs alters/updates cluster resource configuration. |
|
// |
|
// Updates are not transactional so they may succeed for a subset |
|
// of the provided resources while others fail. |
|
// The configuration for a particular resource is updated atomically, |
|
// replacing values using the provided ConfigEntrys and reverting |
|
// unspecified ConfigEntrys to their default values. |
|
// |
|
// Requires broker version >=0.11.0.0 |
|
// |
|
// AlterConfigs will replace all existing configuration for |
|
// the provided resources with the new configuration given, |
|
// reverting all other configuration to their default values. |
|
// |
|
// Multiple resources and resource types may be set, but at most one |
|
// resource of type ResourceBroker is allowed per call since these |
|
// resource requests must be sent to the broker specified in the resource. |
|
func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error) { |
|
cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources)) |
|
|
|
cErrstrSize := C.size_t(512) |
|
cErrstr := (*C.char)(C.malloc(cErrstrSize)) |
|
defer C.free(unsafe.Pointer(cErrstr)) |
|
|
|
// Convert Go ConfigResources to C ConfigResources |
|
for i, res := range resources { |
|
cRes[i] = C.rd_kafka_ConfigResource_new( |
|
C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name)) |
|
if cRes[i] == nil { |
|
return nil, newErrorFromString(ErrInvalidArg, |
|
fmt.Sprintf("Invalid arguments for resource %v", res)) |
|
} |
|
|
|
defer C.rd_kafka_ConfigResource_destroy(cRes[i]) |
|
|
|
for _, entry := range res.Config { |
|
var cErr C.rd_kafka_resp_err_t |
|
switch entry.Operation { |
|
case AlterOperationSet: |
|
cErr = C.rd_kafka_ConfigResource_set_config( |
|
cRes[i], C.CString(entry.Name), C.CString(entry.Value)) |
|
default: |
|
panic(fmt.Sprintf("Invalid ConfigEntry.Operation: %v", entry.Operation)) |
|
} |
|
|
|
if cErr != 0 { |
|
return nil, |
|
newCErrorFromString(cErr, |
|
fmt.Sprintf("Failed to add configuration %s: %s", |
|
entry, C.GoString(C.rd_kafka_err2str(cErr)))) |
|
} |
|
} |
|
} |
|
|
|
// Convert Go AdminOptions (if any) to C AdminOptions |
|
genericOptions := make([]AdminOption, len(options)) |
|
for i := range options { |
|
genericOptions[i] = options[i] |
|
} |
|
cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_ALTERCONFIGS, genericOptions) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_AdminOptions_destroy(cOptions) |
|
|
|
// Create temporary queue for async operation |
|
cQueue := C.rd_kafka_queue_new(a.handle.rk) |
|
defer C.rd_kafka_queue_destroy(cQueue) |
|
|
|
// Asynchronous call |
|
C.rd_kafka_AlterConfigs( |
|
a.handle.rk, |
|
(**C.rd_kafka_ConfigResource_t)(&cRes[0]), |
|
C.size_t(len(cRes)), |
|
cOptions, |
|
cQueue) |
|
|
|
// Wait for result, error or context timeout |
|
rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_ALTERCONFIGS_RESULT) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_event_destroy(rkev) |
|
|
|
cResult := C.rd_kafka_event_AlterConfigs_result(rkev) |
|
|
|
// Convert results from C to Go |
|
var cCnt C.size_t |
|
cResults := C.rd_kafka_AlterConfigs_result_resources(cResult, &cCnt) |
|
|
|
return a.cConfigResourceToResult(cResults, cCnt) |
|
} |
|
|
|
// DescribeConfigs retrieves configuration for cluster resources. |
|
// |
|
// The returned configuration includes default values, use |
|
// ConfigEntryResult.IsDefault or ConfigEntryResult.Source to distinguish |
|
// default values from manually configured settings. |
|
// |
|
// The value of config entries where .IsSensitive is true |
|
// will always be nil to avoid disclosing sensitive |
|
// information, such as security settings. |
|
// |
|
// Configuration entries where .IsReadOnly is true can't be modified |
|
// (with AlterConfigs). |
|
// |
|
// Synonym configuration entries are returned if the broker supports |
|
// it (broker version >= 1.1.0). See .Synonyms. |
|
// |
|
// Requires broker version >=0.11.0.0 |
|
// |
|
// Multiple resources and resource types may be requested, but at most |
|
// one resource of type ResourceBroker is allowed per call |
|
// since these resource requests must be sent to the broker specified |
|
// in the resource. |
|
func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error) { |
|
cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources)) |
|
|
|
cErrstrSize := C.size_t(512) |
|
cErrstr := (*C.char)(C.malloc(cErrstrSize)) |
|
defer C.free(unsafe.Pointer(cErrstr)) |
|
|
|
// Convert Go ConfigResources to C ConfigResources |
|
for i, res := range resources { |
|
cRes[i] = C.rd_kafka_ConfigResource_new( |
|
C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name)) |
|
if cRes[i] == nil { |
|
return nil, newErrorFromString(ErrInvalidArg, |
|
fmt.Sprintf("Invalid arguments for resource %v", res)) |
|
} |
|
|
|
defer C.rd_kafka_ConfigResource_destroy(cRes[i]) |
|
} |
|
|
|
// Convert Go AdminOptions (if any) to C AdminOptions |
|
genericOptions := make([]AdminOption, len(options)) |
|
for i := range options { |
|
genericOptions[i] = options[i] |
|
} |
|
cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, genericOptions) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_AdminOptions_destroy(cOptions) |
|
|
|
// Create temporary queue for async operation |
|
cQueue := C.rd_kafka_queue_new(a.handle.rk) |
|
defer C.rd_kafka_queue_destroy(cQueue) |
|
|
|
// Asynchronous call |
|
C.rd_kafka_DescribeConfigs( |
|
a.handle.rk, |
|
(**C.rd_kafka_ConfigResource_t)(&cRes[0]), |
|
C.size_t(len(cRes)), |
|
cOptions, |
|
cQueue) |
|
|
|
// Wait for result, error or context timeout |
|
rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_event_destroy(rkev) |
|
|
|
cResult := C.rd_kafka_event_DescribeConfigs_result(rkev) |
|
|
|
// Convert results from C to Go |
|
var cCnt C.size_t |
|
cResults := C.rd_kafka_DescribeConfigs_result_resources(cResult, &cCnt) |
|
|
|
return a.cConfigResourceToResult(cResults, cCnt) |
|
} |
|
|
|
// GetMetadata queries broker for cluster and topic metadata. |
|
// If topic is non-nil only information about that topic is returned, else if |
|
// allTopics is false only information about locally used topics is returned, |
|
// else information about all topics is returned. |
|
// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API. |
|
func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) { |
|
return getMetadata(a, topic, allTopics, timeoutMs) |
|
} |
|
|
|
// String returns a human readable name for an AdminClient instance |
|
func (a *AdminClient) String() string { |
|
return fmt.Sprintf("admin-%s", a.handle.String()) |
|
} |
|
|
|
// get_handle implements the Handle interface |
|
func (a *AdminClient) gethandle() *handle { |
|
return a.handle |
|
} |
|
|
|
// SetOAuthBearerToken sets the the data to be transmitted |
|
// to a broker during SASL/OAUTHBEARER authentication. It will return nil |
|
// on success, otherwise an error if: |
|
// 1) the token data is invalid (meaning an expiration time in the past |
|
// or either a token value or an extension key or value that does not meet |
|
// the regular expression requirements as per |
|
// https://tools.ietf.org/html/rfc7628#section-3.1); |
|
// 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; |
|
// 3) SASL/OAUTHBEARER is supported but is not configured as the client's |
|
// authentication mechanism. |
|
func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error { |
|
return a.handle.setOAuthBearerToken(oauthBearerToken) |
|
} |
|
|
|
// SetOAuthBearerTokenFailure sets the error message describing why token |
|
// retrieval/setting failed; it also schedules a new token refresh event for 10 |
|
// seconds later so the attempt may be retried. It will return nil on |
|
// success, otherwise an error if: |
|
// 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; |
|
// 2) SASL/OAUTHBEARER is supported but is not configured as the client's |
|
// authentication mechanism. |
|
func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error { |
|
return a.handle.setOAuthBearerTokenFailure(errstr) |
|
} |
|
|
|
// aclBindingToC converts a Go ACLBinding struct to a C rd_kafka_AclBinding_t |
|
func (a *AdminClient) aclBindingToC(aclBinding *ACLBinding, cErrstr *C.char, cErrstrSize C.size_t) (result *C.rd_kafka_AclBinding_t, err error) { |
|
var cName, cPrincipal, cHost *C.char |
|
cName, cPrincipal, cHost = nil, nil, nil |
|
if len(aclBinding.Name) > 0 { |
|
cName = C.CString(aclBinding.Name) |
|
defer C.free(unsafe.Pointer(cName)) |
|
} |
|
if len(aclBinding.Principal) > 0 { |
|
cPrincipal = C.CString(aclBinding.Principal) |
|
defer C.free(unsafe.Pointer(cPrincipal)) |
|
} |
|
if len(aclBinding.Host) > 0 { |
|
cHost = C.CString(aclBinding.Host) |
|
defer C.free(unsafe.Pointer(cHost)) |
|
} |
|
|
|
result = C.rd_kafka_AclBinding_new( |
|
C.rd_kafka_ResourceType_t(aclBinding.Type), |
|
cName, |
|
C.rd_kafka_ResourcePatternType_t(aclBinding.ResourcePatternType), |
|
cPrincipal, |
|
cHost, |
|
C.rd_kafka_AclOperation_t(aclBinding.Operation), |
|
C.rd_kafka_AclPermissionType_t(aclBinding.PermissionType), |
|
cErrstr, |
|
cErrstrSize, |
|
) |
|
if result == nil { |
|
err = newErrorFromString(ErrInvalidArg, |
|
fmt.Sprintf("Invalid arguments for ACL binding %v: %v", aclBinding, C.GoString(cErrstr))) |
|
} |
|
return |
|
} |
|
|
|
// aclBindingFilterToC converts a Go ACLBindingFilter struct to a C rd_kafka_AclBindingFilter_t |
|
func (a *AdminClient) aclBindingFilterToC(aclBindingFilter *ACLBindingFilter, cErrstr *C.char, cErrstrSize C.size_t) (result *C.rd_kafka_AclBindingFilter_t, err error) { |
|
var cName, cPrincipal, cHost *C.char |
|
cName, cPrincipal, cHost = nil, nil, nil |
|
if len(aclBindingFilter.Name) > 0 { |
|
cName = C.CString(aclBindingFilter.Name) |
|
defer C.free(unsafe.Pointer(cName)) |
|
} |
|
if len(aclBindingFilter.Principal) > 0 { |
|
cPrincipal = C.CString(aclBindingFilter.Principal) |
|
defer C.free(unsafe.Pointer(cPrincipal)) |
|
} |
|
if len(aclBindingFilter.Host) > 0 { |
|
cHost = C.CString(aclBindingFilter.Host) |
|
defer C.free(unsafe.Pointer(cHost)) |
|
} |
|
|
|
result = C.rd_kafka_AclBindingFilter_new( |
|
C.rd_kafka_ResourceType_t(aclBindingFilter.Type), |
|
cName, |
|
C.rd_kafka_ResourcePatternType_t(aclBindingFilter.ResourcePatternType), |
|
cPrincipal, |
|
cHost, |
|
C.rd_kafka_AclOperation_t(aclBindingFilter.Operation), |
|
C.rd_kafka_AclPermissionType_t(aclBindingFilter.PermissionType), |
|
cErrstr, |
|
cErrstrSize, |
|
) |
|
if result == nil { |
|
err = newErrorFromString(ErrInvalidArg, |
|
fmt.Sprintf("Invalid arguments for ACL binding filter %v: %v", aclBindingFilter, C.GoString(cErrstr))) |
|
} |
|
return |
|
} |
|
|
|
// cToACLBinding converts a C rd_kafka_AclBinding_t to Go ACLBinding |
|
func (a *AdminClient) cToACLBinding(cACLBinding *C.rd_kafka_AclBinding_t) ACLBinding { |
|
return ACLBinding{ |
|
ResourceType(C.rd_kafka_AclBinding_restype(cACLBinding)), |
|
C.GoString(C.rd_kafka_AclBinding_name(cACLBinding)), |
|
ResourcePatternType(C.rd_kafka_AclBinding_resource_pattern_type(cACLBinding)), |
|
C.GoString(C.rd_kafka_AclBinding_principal(cACLBinding)), |
|
C.GoString(C.rd_kafka_AclBinding_host(cACLBinding)), |
|
ACLOperation(C.rd_kafka_AclBinding_operation(cACLBinding)), |
|
ACLPermissionType(C.rd_kafka_AclBinding_permission_type(cACLBinding)), |
|
} |
|
} |
|
|
|
// cToACLBindings converts a C rd_kafka_AclBinding_t list to Go ACLBindings |
|
func (a *AdminClient) cToACLBindings(cACLBindings **C.rd_kafka_AclBinding_t, aclCnt C.size_t) (result ACLBindings) { |
|
result = make(ACLBindings, aclCnt) |
|
for i := uint(0); i < uint(aclCnt); i++ { |
|
cACLBinding := C.AclBinding_by_idx(cACLBindings, aclCnt, C.size_t(i)) |
|
if cACLBinding == nil { |
|
panic("AclBinding_by_idx must not return nil") |
|
} |
|
result[i] = a.cToACLBinding(cACLBinding) |
|
} |
|
return |
|
} |
|
|
|
// cToCreateACLResults converts a C acl_result_t array to Go CreateACLResult list. |
|
func (a *AdminClient) cToCreateACLResults(cCreateAclsRes **C.rd_kafka_acl_result_t, aclCnt C.size_t) (result []CreateACLResult, err error) { |
|
result = make([]CreateACLResult, uint(aclCnt)) |
|
|
|
for i := uint(0); i < uint(aclCnt); i++ { |
|
cCreateACLRes := C.acl_result_by_idx(cCreateAclsRes, aclCnt, C.size_t(i)) |
|
if cCreateACLRes != nil { |
|
cCreateACLError := C.rd_kafka_acl_result_error(cCreateACLRes) |
|
result[i].Error = newErrorFromCError(cCreateACLError) |
|
} |
|
} |
|
|
|
return result, nil |
|
} |
|
|
|
// cToDescribeACLsResult converts a C rd_kafka_event_t to a Go DescribeAclsResult struct. |
|
func (a *AdminClient) cToDescribeACLsResult(rkev *C.rd_kafka_event_t) (result *DescribeACLsResult) { |
|
result = &DescribeACLsResult{} |
|
err := C.rd_kafka_event_error(rkev) |
|
errCode := ErrorCode(err) |
|
errStr := C.rd_kafka_event_error_string(rkev) |
|
|
|
var cResultACLsCount C.size_t |
|
cResult := C.rd_kafka_event_DescribeAcls_result(rkev) |
|
cResultACLs := C.rd_kafka_DescribeAcls_result_acls(cResult, &cResultACLsCount) |
|
if errCode != ErrNoError { |
|
result.Error = newErrorFromCString(err, errStr) |
|
} |
|
result.ACLBindings = a.cToACLBindings(cResultACLs, cResultACLsCount) |
|
return |
|
} |
|
|
|
// cToDeleteACLsResults converts a C rd_kafka_DeleteAcls_result_response_t array to Go DeleteAclsResult slice. |
|
func (a *AdminClient) cToDeleteACLsResults(cDeleteACLsResResponse **C.rd_kafka_DeleteAcls_result_response_t, resResponseCnt C.size_t) (result []DeleteACLsResult) { |
|
result = make([]DeleteACLsResult, uint(resResponseCnt)) |
|
|
|
for i := uint(0); i < uint(resResponseCnt); i++ { |
|
cDeleteACLsResResponse := C.DeleteAcls_result_response_by_idx(cDeleteACLsResResponse, resResponseCnt, C.size_t(i)) |
|
if cDeleteACLsResResponse == nil { |
|
panic("DeleteAcls_result_response_by_idx must not return nil") |
|
} |
|
|
|
cDeleteACLsError := C.rd_kafka_DeleteAcls_result_response_error(cDeleteACLsResResponse) |
|
result[i].Error = newErrorFromCError(cDeleteACLsError) |
|
|
|
var cMatchingACLsCount C.size_t |
|
cMatchingACLs := C.rd_kafka_DeleteAcls_result_response_matching_acls( |
|
cDeleteACLsResResponse, &cMatchingACLsCount) |
|
|
|
result[i].ACLBindings = a.cToACLBindings(cMatchingACLs, cMatchingACLsCount) |
|
} |
|
return |
|
} |
|
|
|
// CreateACLs creates one or more ACL bindings. |
|
// |
|
// Parameters: |
|
// * `ctx` - context with the maximum amount of time to block, or nil for indefinite. |
|
// * `aclBindings` - A slice of ACL binding specifications to create. |
|
// * `options` - Create ACLs options |
|
// |
|
// Returns a slice of CreateACLResult with a ErrNoError ErrorCode when the operation was successful |
|
// plus an error that is not nil for client level errors |
|
func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error) { |
|
if aclBindings == nil { |
|
return nil, newErrorFromString(ErrInvalidArg, |
|
"Expected non-nil slice of ACLBinding structs") |
|
} |
|
if len(aclBindings) == 0 { |
|
return nil, newErrorFromString(ErrInvalidArg, |
|
"Expected non-empty slice of ACLBinding structs") |
|
} |
|
|
|
cErrstrSize := C.size_t(512) |
|
cErrstr := (*C.char)(C.malloc(cErrstrSize)) |
|
defer C.free(unsafe.Pointer(cErrstr)) |
|
|
|
cACLBindings := make([]*C.rd_kafka_AclBinding_t, len(aclBindings)) |
|
|
|
for i, aclBinding := range aclBindings { |
|
cACLBindings[i], err = a.aclBindingToC(&aclBinding, cErrstr, cErrstrSize) |
|
if err != nil { |
|
return |
|
} |
|
defer C.rd_kafka_AclBinding_destroy(cACLBindings[i]) |
|
} |
|
|
|
// Convert Go AdminOptions (if any) to C AdminOptions |
|
genericOptions := make([]AdminOption, len(options)) |
|
for i := range options { |
|
genericOptions[i] = options[i] |
|
} |
|
cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATEACLS, genericOptions) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
// Create temporary queue for async operation |
|
cQueue := C.rd_kafka_queue_new(a.handle.rk) |
|
defer C.rd_kafka_queue_destroy(cQueue) |
|
|
|
// Asynchronous call |
|
C.rd_kafka_CreateAcls( |
|
a.handle.rk, |
|
(**C.rd_kafka_AclBinding_t)(&cACLBindings[0]), |
|
C.size_t(len(cACLBindings)), |
|
cOptions, |
|
cQueue) |
|
|
|
// Wait for result, error or context timeout |
|
rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATEACLS_RESULT) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_event_destroy(rkev) |
|
|
|
var cResultCnt C.size_t |
|
cResult := C.rd_kafka_event_CreateAcls_result(rkev) |
|
aclResults := C.rd_kafka_CreateAcls_result_acls(cResult, &cResultCnt) |
|
result, err = a.cToCreateACLResults(aclResults, cResultCnt) |
|
return |
|
} |
|
|
|
// DescribeACLs matches ACL bindings by filter. |
|
// |
|
// Parameters: |
|
// * `ctx` - context with the maximum amount of time to block, or nil for indefinite. |
|
// * `aclBindingFilter` - A filter with attributes that must match. |
|
// string attributes match exact values or any string if set to empty string. |
|
// Enum attributes match exact values or any value if ending with `Any`. |
|
// If `ResourcePatternType` is set to `ResourcePatternTypeMatch` returns ACL bindings with: |
|
// - `ResourcePatternTypeLiteral` pattern type with resource name equal to the given resource name |
|
// - `ResourcePatternTypeLiteral` pattern type with wildcard resource name that matches the given resource name |
|
// - `ResourcePatternTypePrefixed` pattern type with resource name that is a prefix of the given resource name |
|
// * `options` - Describe ACLs options |
|
// |
|
// Returns a slice of ACLBindings when the operation was successful |
|
// plus an error that is not `nil` for client level errors |
|
func (a *AdminClient) DescribeACLs(ctx context.Context, aclBindingFilter ACLBindingFilter, options ...DescribeACLsAdminOption) (result *DescribeACLsResult, err error) { |
|
|
|
cErrstrSize := C.size_t(512) |
|
cErrstr := (*C.char)(C.malloc(cErrstrSize)) |
|
defer C.free(unsafe.Pointer(cErrstr)) |
|
|
|
cACLBindingFilter, err := a.aclBindingFilterToC(&aclBindingFilter, cErrstr, cErrstrSize) |
|
if err != nil { |
|
return |
|
} |
|
|
|
// Convert Go AdminOptions (if any) to C AdminOptions |
|
genericOptions := make([]AdminOption, len(options)) |
|
for i := range options { |
|
genericOptions[i] = options[i] |
|
} |
|
cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBEACLS, genericOptions) |
|
if err != nil { |
|
return nil, err |
|
} |
|
// Create temporary queue for async operation |
|
cQueue := C.rd_kafka_queue_new(a.handle.rk) |
|
defer C.rd_kafka_queue_destroy(cQueue) |
|
|
|
// Asynchronous call |
|
C.rd_kafka_DescribeAcls( |
|
a.handle.rk, |
|
cACLBindingFilter, |
|
cOptions, |
|
cQueue) |
|
|
|
// Wait for result, error or context timeout |
|
rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBEACLS_RESULT) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_event_destroy(rkev) |
|
result = a.cToDescribeACLsResult(rkev) |
|
return |
|
} |
|
|
|
// DeleteACLs deletes ACL bindings matching one or more ACL binding filters. |
|
// |
|
// Parameters: |
|
// * `ctx` - context with the maximum amount of time to block, or nil for indefinite. |
|
// * `aclBindingFilters` - a slice of ACL binding filters to match ACLs to delete. |
|
// string attributes match exact values or any string if set to empty string. |
|
// Enum attributes match exact values or any value if ending with `Any`. |
|
// If `ResourcePatternType` is set to `ResourcePatternTypeMatch` deletes ACL bindings with: |
|
// - `ResourcePatternTypeLiteral` pattern type with resource name equal to the given resource name |
|
// - `ResourcePatternTypeLiteral` pattern type with wildcard resource name that matches the given resource name |
|
// - `ResourcePatternTypePrefixed` pattern type with resource name that is a prefix of the given resource name |
|
// * `options` - Delete ACLs options |
|
// |
|
// Returns a slice of ACLBinding for each filter when the operation was successful |
|
// plus an error that is not `nil` for client level errors |
|
func (a *AdminClient) DeleteACLs(ctx context.Context, aclBindingFilters ACLBindingFilters, options ...DeleteACLsAdminOption) (result []DeleteACLsResult, err error) { |
|
if aclBindingFilters == nil { |
|
return nil, newErrorFromString(ErrInvalidArg, |
|
"Expected non-nil slice of ACLBindingFilter structs") |
|
} |
|
if len(aclBindingFilters) == 0 { |
|
return nil, newErrorFromString(ErrInvalidArg, |
|
"Expected non-empty slice of ACLBindingFilter structs") |
|
} |
|
|
|
cErrstrSize := C.size_t(512) |
|
cErrstr := (*C.char)(C.malloc(cErrstrSize)) |
|
defer C.free(unsafe.Pointer(cErrstr)) |
|
|
|
cACLBindingFilters := make([]*C.rd_kafka_AclBindingFilter_t, len(aclBindingFilters)) |
|
|
|
for i, aclBindingFilter := range aclBindingFilters { |
|
cACLBindingFilters[i], err = a.aclBindingFilterToC(&aclBindingFilter, cErrstr, cErrstrSize) |
|
if err != nil { |
|
return |
|
} |
|
defer C.rd_kafka_AclBinding_destroy(cACLBindingFilters[i]) |
|
} |
|
|
|
// Convert Go AdminOptions (if any) to C AdminOptions |
|
genericOptions := make([]AdminOption, len(options)) |
|
for i := range options { |
|
genericOptions[i] = options[i] |
|
} |
|
cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DELETEACLS, genericOptions) |
|
if err != nil { |
|
return nil, err |
|
} |
|
// Create temporary queue for async operation |
|
cQueue := C.rd_kafka_queue_new(a.handle.rk) |
|
defer C.rd_kafka_queue_destroy(cQueue) |
|
|
|
// Asynchronous call |
|
C.rd_kafka_DeleteAcls( |
|
a.handle.rk, |
|
(**C.rd_kafka_AclBindingFilter_t)(&cACLBindingFilters[0]), |
|
C.size_t(len(cACLBindingFilters)), |
|
cOptions, |
|
cQueue) |
|
|
|
// Wait for result, error or context timeout |
|
rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DELETEACLS_RESULT) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer C.rd_kafka_event_destroy(rkev) |
|
|
|
var cResultResponsesCount C.size_t |
|
cResult := C.rd_kafka_event_DeleteAcls_result(rkev) |
|
cResultResponses := C.rd_kafka_DeleteAcls_result_responses(cResult, &cResultResponsesCount) |
|
result = a.cToDeleteACLsResults(cResultResponses, cResultResponsesCount) |
|
return |
|
} |
|
|
|
// Close an AdminClient instance. |
|
func (a *AdminClient) Close() { |
|
if a.isDerived { |
|
// Derived AdminClient needs no cleanup. |
|
a.handle = &handle{} |
|
return |
|
} |
|
|
|
a.handle.cleanup() |
|
|
|
C.rd_kafka_destroy(a.handle.rk) |
|
} |
|
|
|
// NewAdminClient creats a new AdminClient instance with a new underlying client instance |
|
func NewAdminClient(conf *ConfigMap) (*AdminClient, error) { |
|
|
|
err := versionCheck() |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
a := &AdminClient{} |
|
a.handle = &handle{} |
|
|
|
// Convert ConfigMap to librdkafka conf_t |
|
cConf, err := conf.convert() |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
cErrstr := (*C.char)(C.malloc(C.size_t(256))) |
|
defer C.free(unsafe.Pointer(cErrstr)) |
|
|
|
C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR|C.RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH) |
|
|
|
// Create librdkafka producer instance. The Producer is somewhat cheaper than |
|
// the consumer, but any instance type can be used for Admin APIs. |
|
a.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256) |
|
if a.handle.rk == nil { |
|
return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr) |
|
} |
|
|
|
a.isDerived = false |
|
a.handle.setup() |
|
|
|
return a, nil |
|
} |
|
|
|
// NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance. |
|
// The AdminClient will use the same configuration and connections as the parent instance. |
|
func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error) { |
|
if p.handle.rk == nil { |
|
return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed producer") |
|
} |
|
|
|
a = &AdminClient{} |
|
a.handle = &p.handle |
|
a.isDerived = true |
|
return a, nil |
|
} |
|
|
|
// NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance. |
|
// The AdminClient will use the same configuration and connections as the parent instance. |
|
func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) { |
|
if c.handle.rk == nil { |
|
return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed consumer") |
|
} |
|
|
|
a = &AdminClient{} |
|
a.handle = &c.handle |
|
a.isDerived = true |
|
return a, nil |
|
}
|
|
|