You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

379 lines
10 KiB

/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka
import (
"fmt"
"strings"
"sync"
"time"
"unsafe"
)
/*
#include "select_rdkafka.h"
#include <stdlib.h>
*/
import "C"
// OAuthBearerToken represents the data to be transmitted
// to a broker during SASL/OAUTHBEARER authentication.
type OAuthBearerToken struct {
// Token value, often (but not necessarily) a JWS compact serialization
// as per https://tools.ietf.org/html/rfc7515#section-3.1; it must meet
// the regular expression for a SASL/OAUTHBEARER value defined at
// https://tools.ietf.org/html/rfc7628#section-3.1
TokenValue string
// Metadata about the token indicating when it expires (local time);
// it must represent a time in the future
Expiration time.Time
// Metadata about the token indicating the Kafka principal name
// to which it applies (for example, "admin")
Principal string
// SASL extensions, if any, to be communicated to the broker during
// authentication (all keys and values of which must meet the regular
// expressions defined at https://tools.ietf.org/html/rfc7628#section-3.1,
// and it must not contain the reserved "auth" key)
Extensions map[string]string
}
// Handle represents a generic client handle containing common parts for
// both Producer and Consumer.
type Handle interface {
// SetOAuthBearerToken sets the the data to be transmitted
// to a broker during SASL/OAUTHBEARER authentication. It will return nil
// on success, otherwise an error if:
// 1) the token data is invalid (meaning an expiration time in the past
// or either a token value or an extension key or value that does not meet
// the regular expression requirements as per
// https://tools.ietf.org/html/rfc7628#section-3.1);
// 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build;
// 3) SASL/OAUTHBEARER is supported but is not configured as the client's
// authentication mechanism.
SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
// SetOAuthBearerTokenFailure sets the error message describing why token
// retrieval/setting failed; it also schedules a new token refresh event for 10
// seconds later so the attempt may be retried. It will return nil on
// success, otherwise an error if:
// 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build;
// 2) SASL/OAUTHBEARER is supported but is not configured as the client's
// authentication mechanism.
SetOAuthBearerTokenFailure(errstr string) error
// gethandle() returns the internal handle struct pointer
gethandle() *handle
}
// Common instance handle for both Producer and Consumer
type handle struct {
rk *C.rd_kafka_t
rkq *C.rd_kafka_queue_t
// Forward logs from librdkafka log queue to logs channel.
logs chan LogEvent
logq *C.rd_kafka_queue_t
closeLogsChan bool
// Topic <-> rkt caches
rktCacheLock sync.Mutex
// topic name -> rkt cache
rktCache map[string]*C.rd_kafka_topic_t
// rkt -> topic name cache
rktNameCache map[*C.rd_kafka_topic_t]string
// Cached instance name to avoid CGo call in String()
name string
//
// cgo map
// Maps C callbacks based on cgoid back to its Go object
cgoLock sync.Mutex
cgoidNext uintptr
cgomap map[int]cgoif
//
// producer
//
p *Producer
// Forward delivery reports on Producer.Events channel
fwdDr bool
// Enabled message fields for delivery reports and consumed messages.
msgFields *messageFields
//
// consumer
//
c *Consumer
// WaitGroup to wait for spawned go-routines to finish.
waitGroup sync.WaitGroup
}
func (h *handle) String() string {
return h.name
}
func (h *handle) setup() {
h.rktCache = make(map[string]*C.rd_kafka_topic_t)
h.rktNameCache = make(map[*C.rd_kafka_topic_t]string)
h.cgomap = make(map[int]cgoif)
h.name = C.GoString(C.rd_kafka_name(h.rk))
if h.msgFields == nil {
h.msgFields = newMessageFields()
}
}
func (h *handle) cleanup() {
if h.logs != nil {
C.rd_kafka_queue_destroy(h.logq)
if h.closeLogsChan {
close(h.logs)
}
}
for _, crkt := range h.rktCache {
C.rd_kafka_topic_destroy(crkt)
}
if h.rkq != nil {
C.rd_kafka_queue_destroy(h.rkq)
}
}
func (h *handle) setupLogQueue(logsChan chan LogEvent, termChan chan bool) {
if logsChan == nil {
logsChan = make(chan LogEvent, 10000)
h.closeLogsChan = true
}
h.logs = logsChan
// Let librdkafka forward logs to our log queue instead of the main queue
h.logq = C.rd_kafka_queue_new(h.rk)
C.rd_kafka_set_log_queue(h.rk, h.logq)
// Start a polling goroutine to consume the log queue
h.waitGroup.Add(1)
go func() {
h.pollLogEvents(h.logs, 100, termChan)
h.waitGroup.Done()
}()
}
// getRkt0 finds or creates and returns a C topic_t object from the local cache.
func (h *handle) getRkt0(topic string, ctopic *C.char, doLock bool) (crkt *C.rd_kafka_topic_t) {
if doLock {
h.rktCacheLock.Lock()
defer h.rktCacheLock.Unlock()
}
crkt, ok := h.rktCache[topic]
if ok {
return crkt
}
if ctopic == nil {
ctopic = C.CString(topic)
defer C.free(unsafe.Pointer(ctopic))
}
crkt = C.rd_kafka_topic_new(h.rk, ctopic, nil)
if crkt == nil {
panic(fmt.Sprintf("Unable to create new C topic \"%s\": %s",
topic, C.GoString(C.rd_kafka_err2str(C.rd_kafka_last_error()))))
}
h.rktCache[topic] = crkt
h.rktNameCache[crkt] = topic
return crkt
}
// getRkt finds or creates and returns a C topic_t object from the local cache.
func (h *handle) getRkt(topic string) (crkt *C.rd_kafka_topic_t) {
return h.getRkt0(topic, nil, true)
}
// getTopicNameFromRkt returns the topic name for a C topic_t object, preferably
// using the local cache to avoid a cgo call.
func (h *handle) getTopicNameFromRkt(crkt *C.rd_kafka_topic_t) (topic string) {
h.rktCacheLock.Lock()
defer h.rktCacheLock.Unlock()
topic, ok := h.rktNameCache[crkt]
if ok {
return topic
}
// we need our own copy/refcount of the crkt
ctopic := C.rd_kafka_topic_name(crkt)
topic = C.GoString(ctopic)
crkt = h.getRkt0(topic, ctopic, false /* dont lock */)
return topic
}
// cgoif is a generic interface for holding Go state passed as opaque
// value to the C code.
// Since pointers to complex Go types cannot be passed to C we instead create
// a cgoif object, generate a unique id that is added to the cgomap,
// and then pass that id to the C code. When the C code callback is called we
// use the id to look up the cgoif object in the cgomap.
type cgoif interface{}
// delivery report cgoif container
type cgoDr struct {
deliveryChan chan Event
opaque interface{}
}
// cgoPut adds object cg to the handle's cgo map and returns a
// unique id for the added entry.
// Thread-safe.
// FIXME: the uniquity of the id is questionable over time.
func (h *handle) cgoPut(cg cgoif) (cgoid int) {
h.cgoLock.Lock()
defer h.cgoLock.Unlock()
h.cgoidNext++
if h.cgoidNext == 0 {
h.cgoidNext++
}
cgoid = (int)(h.cgoidNext)
h.cgomap[cgoid] = cg
return cgoid
}
// cgoGet looks up cgoid in the cgo map, deletes the reference from the map
// and returns the object, if found. Else returns nil, false.
// Thread-safe.
func (h *handle) cgoGet(cgoid int) (cg cgoif, found bool) {
if cgoid == 0 {
return nil, false
}
h.cgoLock.Lock()
defer h.cgoLock.Unlock()
cg, found = h.cgomap[cgoid]
if found {
delete(h.cgomap, cgoid)
}
return cg, found
}
// setOauthBearerToken - see rd_kafka_oauthbearer_set_token()
func (h *handle) setOAuthBearerToken(oauthBearerToken OAuthBearerToken) error {
cTokenValue := C.CString(oauthBearerToken.TokenValue)
defer C.free(unsafe.Pointer(cTokenValue))
cPrincipal := C.CString(oauthBearerToken.Principal)
defer C.free(unsafe.Pointer(cPrincipal))
cErrstrSize := C.size_t(512)
cErrstr := (*C.char)(C.malloc(cErrstrSize))
defer C.free(unsafe.Pointer(cErrstr))
cExtensions := make([]*C.char, 2*len(oauthBearerToken.Extensions))
extensionSize := 0
for key, value := range oauthBearerToken.Extensions {
cExtensions[extensionSize] = C.CString(key)
defer C.free(unsafe.Pointer(cExtensions[extensionSize]))
extensionSize++
cExtensions[extensionSize] = C.CString(value)
defer C.free(unsafe.Pointer(cExtensions[extensionSize]))
extensionSize++
}
var cExtensionsToUse **C.char
if extensionSize > 0 {
cExtensionsToUse = (**C.char)(unsafe.Pointer(&cExtensions[0]))
}
cErr := C.rd_kafka_oauthbearer_set_token(h.rk, cTokenValue,
C.int64_t(oauthBearerToken.Expiration.UnixNano()/(1000*1000)), cPrincipal,
cExtensionsToUse, C.size_t(extensionSize), cErrstr, cErrstrSize)
if cErr == C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil
}
return newErrorFromCString(cErr, cErrstr)
}
// setOauthBearerTokenFailure - see rd_kafka_oauthbearer_set_token_failure()
func (h *handle) setOAuthBearerTokenFailure(errstr string) error {
cerrstr := C.CString(errstr)
defer C.free(unsafe.Pointer(cerrstr))
cErr := C.rd_kafka_oauthbearer_set_token_failure(h.rk, cerrstr)
if cErr == C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil
}
return newError(cErr)
}
// messageFields controls which fields are made available for producer delivery reports & consumed messages.
// true values indicate that the field should be included
type messageFields struct {
Key bool
Value bool
Headers bool
}
// disableAll disable all fields
func (mf *messageFields) disableAll() {
mf.Key = false
mf.Value = false
mf.Headers = false
}
// newMessageFields returns a new messageFields with all fields enabled
func newMessageFields() *messageFields {
return &messageFields{
Key: true,
Value: true,
Headers: true,
}
}
// newMessageFieldsFrom constructs a new messageFields from the given configuration value
func newMessageFieldsFrom(v ConfigValue) (*messageFields, error) {
msgFields := newMessageFields()
switch v {
case "all":
// nothing to do
case "", "none":
msgFields.disableAll()
default:
msgFields.disableAll()
for _, value := range strings.Split(v.(string), ",") {
switch value {
case "key":
msgFields.Key = true
case "value":
msgFields.Value = true
case "headers":
msgFields.Headers = true
default:
return nil, fmt.Errorf("unknown message field: %s", value)
}
}
}
return msgFields, nil
}