You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
316 lines
8.3 KiB
316 lines
8.3 KiB
/** |
|
* Copyright 2016 Confluent Inc. |
|
* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
*/ |
|
|
|
package kafka |
|
|
|
import ( |
|
"fmt" |
|
"os" |
|
"unsafe" |
|
) |
|
|
|
/* |
|
#include <stdlib.h> |
|
#include "select_rdkafka.h" |
|
#include "glue_rdkafka.h" |
|
|
|
|
|
void chdrs_to_tmphdrs (glue_msg_t *gMsg) { |
|
size_t i = 0; |
|
const char *name; |
|
const void *val; |
|
size_t size; |
|
rd_kafka_headers_t *chdrs; |
|
|
|
if (rd_kafka_message_headers(gMsg->msg, &chdrs)) { |
|
gMsg->tmphdrs = NULL; |
|
gMsg->tmphdrsCnt = 0; |
|
return; |
|
} |
|
|
|
gMsg->tmphdrsCnt = rd_kafka_header_cnt(chdrs); |
|
gMsg->tmphdrs = malloc(sizeof(*gMsg->tmphdrs) * gMsg->tmphdrsCnt); |
|
|
|
while (!rd_kafka_header_get_all(chdrs, i, |
|
&gMsg->tmphdrs[i].key, |
|
&gMsg->tmphdrs[i].val, |
|
(size_t *)&gMsg->tmphdrs[i].size)) |
|
i++; |
|
} |
|
|
|
rd_kafka_event_t *_rk_queue_poll (rd_kafka_queue_t *rkq, int timeoutMs, |
|
rd_kafka_event_type_t *evtype, |
|
glue_msg_t *gMsg, |
|
rd_kafka_event_t *prev_rkev) { |
|
rd_kafka_event_t *rkev; |
|
|
|
if (prev_rkev) |
|
rd_kafka_event_destroy(prev_rkev); |
|
|
|
rkev = rd_kafka_queue_poll(rkq, timeoutMs); |
|
*evtype = rd_kafka_event_type(rkev); |
|
|
|
if (*evtype == RD_KAFKA_EVENT_FETCH) { |
|
gMsg->msg = (rd_kafka_message_t *)rd_kafka_event_message_next(rkev); |
|
gMsg->ts = rd_kafka_message_timestamp(gMsg->msg, &gMsg->tstype); |
|
|
|
if (gMsg->want_hdrs) |
|
chdrs_to_tmphdrs(gMsg); |
|
} |
|
|
|
return rkev; |
|
} |
|
*/ |
|
import "C" |
|
|
|
func chdrsToTmphdrs(gMsg *C.glue_msg_t) { |
|
C.chdrs_to_tmphdrs(gMsg) |
|
} |
|
|
|
// Event generic interface |
|
type Event interface { |
|
// String returns a human-readable representation of the event |
|
String() string |
|
} |
|
|
|
// Specific event types |
|
|
|
// Stats statistics event |
|
type Stats struct { |
|
statsJSON string |
|
} |
|
|
|
func (e Stats) String() string { |
|
return e.statsJSON |
|
} |
|
|
|
// AssignedPartitions consumer group rebalance event: assigned partition set |
|
type AssignedPartitions struct { |
|
Partitions []TopicPartition |
|
} |
|
|
|
func (e AssignedPartitions) String() string { |
|
return fmt.Sprintf("AssignedPartitions: %v", e.Partitions) |
|
} |
|
|
|
// RevokedPartitions consumer group rebalance event: revoked partition set |
|
type RevokedPartitions struct { |
|
Partitions []TopicPartition |
|
} |
|
|
|
func (e RevokedPartitions) String() string { |
|
return fmt.Sprintf("RevokedPartitions: %v", e.Partitions) |
|
} |
|
|
|
// PartitionEOF consumer reached end of partition |
|
// Needs to be explicitly enabled by setting the `enable.partition.eof` |
|
// configuration property to true. |
|
type PartitionEOF TopicPartition |
|
|
|
func (p PartitionEOF) String() string { |
|
return fmt.Sprintf("EOF at %s", TopicPartition(p)) |
|
} |
|
|
|
// OffsetsCommitted reports committed offsets |
|
type OffsetsCommitted struct { |
|
Error error |
|
Offsets []TopicPartition |
|
} |
|
|
|
func (o OffsetsCommitted) String() string { |
|
return fmt.Sprintf("OffsetsCommitted (%v, %v)", o.Error, o.Offsets) |
|
} |
|
|
|
// OAuthBearerTokenRefresh indicates token refresh is required |
|
type OAuthBearerTokenRefresh struct { |
|
// Config is the value of the sasl.oauthbearer.config property |
|
Config string |
|
} |
|
|
|
func (o OAuthBearerTokenRefresh) String() string { |
|
return "OAuthBearerTokenRefresh" |
|
} |
|
|
|
// eventPoll polls an event from the handler's C rd_kafka_queue_t, |
|
// translates it into an Event type and then sends on `channel` if non-nil, else returns the Event. |
|
// term_chan is an optional channel to monitor along with producing to channel |
|
// to indicate that `channel` is being terminated. |
|
// returns (event Event, terminate Bool) tuple, where Terminate indicates |
|
// if termChan received a termination event. |
|
func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) { |
|
|
|
var prevRkev *C.rd_kafka_event_t |
|
term := false |
|
|
|
var retval Event |
|
|
|
if channel == nil { |
|
maxEvents = 1 |
|
} |
|
out: |
|
for evcnt := 0; evcnt < maxEvents; evcnt++ { |
|
var evtype C.rd_kafka_event_type_t |
|
var gMsg C.glue_msg_t |
|
gMsg.want_hdrs = C.int8_t(bool2cint(h.msgFields.Headers)) |
|
rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &gMsg, prevRkev) |
|
prevRkev = rkev |
|
timeoutMs = 0 |
|
|
|
retval = nil |
|
|
|
switch evtype { |
|
case C.RD_KAFKA_EVENT_FETCH: |
|
// Consumer fetch event, new message. |
|
// Extracted into temporary gMsg for optimization |
|
retval = h.newMessageFromGlueMsg(&gMsg) |
|
|
|
case C.RD_KAFKA_EVENT_REBALANCE: |
|
// Consumer rebalance event |
|
retval = h.c.handleRebalanceEvent(channel, rkev) |
|
|
|
case C.RD_KAFKA_EVENT_ERROR: |
|
// Error event |
|
cErr := C.rd_kafka_event_error(rkev) |
|
if cErr == C.RD_KAFKA_RESP_ERR__PARTITION_EOF { |
|
crktpar := C.rd_kafka_event_topic_partition(rkev) |
|
if crktpar == nil { |
|
break |
|
} |
|
|
|
defer C.rd_kafka_topic_partition_destroy(crktpar) |
|
var peof PartitionEOF |
|
setupTopicPartitionFromCrktpar((*TopicPartition)(&peof), crktpar) |
|
|
|
retval = peof |
|
|
|
} else if int(C.rd_kafka_event_error_is_fatal(rkev)) != 0 { |
|
// A fatal error has been raised. |
|
// Extract the actual error from the client |
|
// instance and return a new Error with |
|
// fatal set to true. |
|
cFatalErrstrSize := C.size_t(512) |
|
cFatalErrstr := (*C.char)(C.malloc(cFatalErrstrSize)) |
|
defer C.free(unsafe.Pointer(cFatalErrstr)) |
|
cFatalErr := C.rd_kafka_fatal_error(h.rk, cFatalErrstr, cFatalErrstrSize) |
|
fatalErr := newErrorFromCString(cFatalErr, cFatalErrstr) |
|
fatalErr.fatal = true |
|
retval = fatalErr |
|
|
|
} else { |
|
retval = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev)) |
|
} |
|
|
|
case C.RD_KAFKA_EVENT_STATS: |
|
retval = &Stats{C.GoString(C.rd_kafka_event_stats(rkev))} |
|
|
|
case C.RD_KAFKA_EVENT_DR: |
|
// Producer Delivery Report event |
|
// Each such event contains delivery reports for all |
|
// messages in the produced batch. |
|
// Forward delivery reports to per-message's response channel |
|
// or to the global Producer.Events channel, or none. |
|
rkmessages := make([]*C.rd_kafka_message_t, int(C.rd_kafka_event_message_count(rkev))) |
|
|
|
cnt := int(C.rd_kafka_event_message_array(rkev, (**C.rd_kafka_message_t)(unsafe.Pointer(&rkmessages[0])), C.size_t(len(rkmessages)))) |
|
|
|
for _, rkmessage := range rkmessages[:cnt] { |
|
msg := h.newMessageFromC(rkmessage) |
|
var ch *chan Event |
|
|
|
if rkmessage._private != nil { |
|
// Find cgoif by id |
|
cg, found := h.cgoGet((int)((uintptr)(rkmessage._private))) |
|
if found { |
|
cdr := cg.(cgoDr) |
|
|
|
if cdr.deliveryChan != nil { |
|
ch = &cdr.deliveryChan |
|
} |
|
msg.Opaque = cdr.opaque |
|
} |
|
} |
|
|
|
if ch == nil && h.fwdDr { |
|
ch = &channel |
|
} |
|
|
|
if ch != nil { |
|
select { |
|
case *ch <- msg: |
|
case <-termChan: |
|
retval = nil |
|
term = true |
|
break out |
|
} |
|
|
|
} else { |
|
retval = msg |
|
break out |
|
} |
|
} |
|
|
|
case C.RD_KAFKA_EVENT_OFFSET_COMMIT: |
|
// Offsets committed |
|
cErr := C.rd_kafka_event_error(rkev) |
|
coffsets := C.rd_kafka_event_topic_partition_list(rkev) |
|
var offsets []TopicPartition |
|
if coffsets != nil { |
|
offsets = newTopicPartitionsFromCparts(coffsets) |
|
} |
|
|
|
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
|
retval = OffsetsCommitted{newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev)), offsets} |
|
} else { |
|
retval = OffsetsCommitted{nil, offsets} |
|
} |
|
|
|
case C.RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH: |
|
ev := OAuthBearerTokenRefresh{C.GoString(C.rd_kafka_event_config_string(rkev))} |
|
retval = ev |
|
|
|
case C.RD_KAFKA_EVENT_NONE: |
|
// poll timed out: no events available |
|
break out |
|
|
|
default: |
|
if rkev != nil { |
|
fmt.Fprintf(os.Stderr, "Ignored event %s\n", |
|
C.GoString(C.rd_kafka_event_name(rkev))) |
|
} |
|
|
|
} |
|
|
|
if retval != nil { |
|
if channel != nil { |
|
select { |
|
case channel <- retval: |
|
case <-termChan: |
|
retval = nil |
|
term = true |
|
break out |
|
} |
|
} else { |
|
break out |
|
} |
|
} |
|
} |
|
|
|
if prevRkev != nil { |
|
C.rd_kafka_event_destroy(prevRkev) |
|
} |
|
|
|
return retval, term |
|
}
|
|
|