You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
159 lines
4.5 KiB
159 lines
4.5 KiB
/** |
|
* Copyright 2016 Confluent Inc. |
|
* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
*/ |
|
|
|
package kafka |
|
|
|
// Automatically generate error codes from librdkafka |
|
// See README for instructions |
|
//go:generate $GOPATH/bin/go_rdkafka_generr generated_errors.go |
|
|
|
/* |
|
#include <stdlib.h> |
|
#include "select_rdkafka.h" |
|
*/ |
|
import "C" |
|
|
|
import ( |
|
"fmt" |
|
"unsafe" |
|
) |
|
|
|
// Error provides a Kafka-specific error container |
|
type Error struct { |
|
code ErrorCode |
|
str string |
|
fatal bool |
|
retriable bool |
|
txnRequiresAbort bool |
|
} |
|
|
|
func newError(code C.rd_kafka_resp_err_t) (err Error) { |
|
return Error{code: ErrorCode(code)} |
|
} |
|
|
|
// NewError creates a new Error. |
|
func NewError(code ErrorCode, str string, fatal bool) (err Error) { |
|
return Error{code: code, str: str, fatal: fatal} |
|
} |
|
|
|
func newErrorFromString(code ErrorCode, str string) (err Error) { |
|
return Error{code: code, str: str} |
|
} |
|
|
|
func newErrorFromCString(code C.rd_kafka_resp_err_t, cstr *C.char) (err Error) { |
|
var str string |
|
if cstr != nil { |
|
str = C.GoString(cstr) |
|
} else { |
|
str = "" |
|
} |
|
return Error{code: ErrorCode(code), str: str} |
|
} |
|
|
|
func newCErrorFromString(code C.rd_kafka_resp_err_t, str string) (err Error) { |
|
return newErrorFromString(ErrorCode(code), str) |
|
} |
|
|
|
// newErrorFromCError creates a new Error instance |
|
func newErrorFromCError(cError *C.rd_kafka_error_t) Error { |
|
return Error{ |
|
code: ErrorCode(C.rd_kafka_error_code(cError)), |
|
str: C.GoString(C.rd_kafka_error_string(cError)), |
|
fatal: cint2bool(C.rd_kafka_error_is_fatal(cError)), |
|
retriable: cint2bool(C.rd_kafka_error_is_retriable(cError)), |
|
txnRequiresAbort: cint2bool(C.rd_kafka_error_txn_requires_abort(cError)), |
|
} |
|
} |
|
|
|
// newErrorFromCErrorDestroy creates a new Error instance and destroys |
|
// the passed cError. |
|
func newErrorFromCErrorDestroy(cError *C.rd_kafka_error_t) Error { |
|
defer C.rd_kafka_error_destroy(cError) |
|
return newErrorFromCError(cError) |
|
} |
|
|
|
// Error returns a human readable representation of an Error |
|
// Same as Error.String() |
|
func (e Error) Error() string { |
|
return e.String() |
|
} |
|
|
|
// String returns a human readable representation of an Error |
|
func (e Error) String() string { |
|
var errstr string |
|
if len(e.str) > 0 { |
|
errstr = e.str |
|
} else { |
|
errstr = e.code.String() |
|
} |
|
|
|
if e.IsFatal() { |
|
return fmt.Sprintf("Fatal error: %s", errstr) |
|
} |
|
|
|
return errstr |
|
} |
|
|
|
// Code returns the ErrorCode of an Error |
|
func (e Error) Code() ErrorCode { |
|
return e.code |
|
} |
|
|
|
// IsFatal returns true if the error is a fatal error. |
|
// A fatal error indicates the client instance is no longer operable and |
|
// should be terminated. Typical causes include non-recoverable |
|
// idempotent producer errors. |
|
func (e Error) IsFatal() bool { |
|
return e.fatal |
|
} |
|
|
|
// IsRetriable returns true if the operation that caused this error |
|
// may be retried. |
|
// This flag is currently only set by the Transactional producer API. |
|
func (e Error) IsRetriable() bool { |
|
return e.retriable |
|
} |
|
|
|
// TxnRequiresAbort returns true if the error is an abortable transaction error |
|
// that requires the application to abort the current transaction with |
|
// AbortTransaction() and start a new transaction with BeginTransaction() |
|
// if it wishes to proceed with transactional operations. |
|
// This flag is only set by the Transactional producer API. |
|
func (e Error) TxnRequiresAbort() bool { |
|
return e.txnRequiresAbort |
|
} |
|
|
|
// getFatalError returns an Error object if the client instance has raised a fatal error, else nil. |
|
func getFatalError(H Handle) error { |
|
cErrstr := (*C.char)(C.malloc(C.size_t(512))) |
|
defer C.free(unsafe.Pointer(cErrstr)) |
|
|
|
cErr := C.rd_kafka_fatal_error(H.gethandle().rk, cErrstr, 512) |
|
if int(cErr) == 0 { |
|
return nil |
|
} |
|
|
|
err := newErrorFromCString(cErr, cErrstr) |
|
err.fatal = true |
|
|
|
return err |
|
} |
|
|
|
// testFatalError triggers a fatal error in the underlying client. |
|
// This is to be used strictly for testing purposes. |
|
func testFatalError(H Handle, code ErrorCode, str string) ErrorCode { |
|
return ErrorCode(C.rd_kafka_test_fatal_error(H.gethandle().rk, C.rd_kafka_resp_err_t(code), C.CString(str))) |
|
}
|
|
|