The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Kafka - constants and messages used by the Kafka package modules

VERSION

This documentation refers to Kafka package version 0.12

SYNOPSIS

An example of Kafka usage:

    use Kafka qw(
        BITS64
        KAFKA_SERVER_PORT
        DEFAULT_TIMEOUT
        TIMESTAMP_EARLIEST
        DEFAULT_MAX_OFFSETS
        DEFAULT_MAX_SIZE
        );

    # common information
    print "This is Kafka package $Kafka::VERSION\n";
    print "You have a ", BITS64 ? "64" : "32", " bit system\n";

    use Kafka::IO;

    # connect to local server with the defaults
    my $io = Kafka::IO->new( host => "localhost" );

    # decoding of the error code
    unless ( $io )
    {
        print STDERR "last error: ",
            $Kafka::ERROR[Kafka::IO::last_errorcode], "\n";
    }

To see a brief but working code example of the Kafka package usage look at the "An Example" section.

ABSTRACT

The Kafka package is a set of Perl modules which provides a simple and consistent application programming interface (API) to Apache Kafka 0.7, a high-throughput distributed messaging system. This is a low-level API implementation which DOES NOT interract with an Apache ZooKeeper for consumer coordination and/or load balancing.

DESCRIPTION

The user modules in this package provide an object oriented API. The IO agents, requests sent, and responses received from the Apache Kafka or mock servers are all represented by objects. This makes a simple and powerful interface to these services.

The main features of the package are:

  • Contains various reusable components (modules) that can be used separately or together.

  • Provides an object oriented model of communication.

  • Supports parsing the Apache Kafka Wire Format protocol.

  • Supports the Apache Kafka Requests and Responses (PRODUCE and FETCH with no compression codec attribute now). Within this package we currently support access to the PRODUCE Request, FETCH Request, OFFSETS Request, FETCH Response, OFFSETS Response.

  • Simple producer and consumer clients.

  • Simple mock server instance for testing without Apache Kafka server.

  • Support for working with 64 bit elements of the Kafka Wire Format protocol on 32 bit systems.

APACHE KAFKA'S STYLE COMMUNICATION

The Kafka package is based on Kafka's 0.7 Wire Format specification document at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/

  • The Kafka's Wire Format protocol is based on a request/response paradigm. A client establishes a connection with a server and sends a request to the server in the form of a request method, followed by a messages containing request modifiers. The server responds with a success or error code, followed by a messages containing entity meta-information and content.

Communication with Kafka always assumes to follow these steps: First the IO and client objects are created and configured.

The Kafka client has the class name Kafka::Producer or Kafka::Consumer.

Messages are the fundamental unit of communication. They are published to a topic by a producer, which means they are physically sent to a server acting as a broker. Some number of consumers subscribe to a topic, and each published message is delivered to all the consumers. The messages stream is partitioned on the brokers as a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition the messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. In Apache Kafka, the consumers are responsible for maintaining state information (offset) on what has been consumed. A consumer can deliberately rewind back to an old offset and re-consume data. Each message is uniquely identified by a 64-bit integer offset giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. Reads are done by giving the 64-bit logical offset of a message and a max chunk size.

The request is then passed through the client to a server and we get the response in return to a consumer request that we can examine. A request is always independent of any previous requests, i.e. the service is stateless. This API is completely stateless, with the topic and partition being passed in on every request.

The IO Object

The clients uses the IO object to communicate with the Apache Kafka server. The IO object is an interface layer between your application code and the network.

IO object is required to create objects of classes Kafka::Producer and Kafka::Consumer.

Kafka IO API is implemented by Kafka::IO class.

    use Kafka::IO;

    # connect to local server with the defaults
    my $io = Kafka::IO->new( host => "localhost" );

The main attributes of the IO object are:

  • host, and port are the IO object attributes denoting the server and the port name of the "documents" (messages) service we want to access (Apache Kafka server).

  • timeout specifies how much time we give remote servers to respond before the IO object disconnects and creates an internal exception.

The Producer Object

Kafka producer API is implemented by Kafka::Producer class.

    use Kafka::Producer;

    #-- Producer
    my $producer = Kafka::Producer->new( IO => $io );

    # Sending a single message
    $producer->send(
        "test",             # topic
        0,                  # partition
        "Single message"    # message
        );

    # Sending a series of messages
    $producer->send(
        "test",             # topic
        0,                  # partition
        [                   # messages
            "The first message",
            "The second message",
            "The third message",
        ]
        );

The main attributes of the producer request are:

  • The request method of the producer object is send().

  • topic and partition encode parameters of the messages we want to send.

  • messages is an arbitrary amount of data (a simple data string or an array of the data strings).

The Consumer Object

Kafka consumer API is implemented by Kafka::Consumer class.

    use Kafka::Consumer;

    $consumer = Kafka::Consumer->new( IO => $io );

The request methods of the consumer object are offsets() and fetch().

offsets method returns a reference to the list of offsets of received messages.

fetch method returns a reference to the list of received Kafka::Message objects.

    # Get a list of valid offsets up to max_number before the given time
    if ( my $offsets = $consumer->offsets(
        "test",             # topic
        0,                  # partition
        TIMESTAMP_EARLIEST, # time
        DEFAULT_MAX_OFFSETS # max_number
        ) )
    {
        foreach my $offset ( @$offsets )
        {
            print "Received offset: $offset\n";
        }
    }

    # Consuming messages
    if ( my $messages = $consumer->fetch(
        "test",             # topic
        0,                  # partition
        0,                  # offset
        DEFAULT_MAX_SIZE    # max_size
        ) )
    {
        foreach my $message ( @$messages )
        {
            if( $message->valid )
            {
                print "payload    : ", $message->payload,       "\n";
                print "offset     : ", $message->offset,        "\n";
                print "next_offset: ", $message->next_offset,   "\n";
            }
        }
    }

The arguments:

  • topic and partition specify the location of the messages we want to retrieve.

  • offset, max_size or time, max_number arguments are additional information that specify attributes of the messages we want to access.

  • time is the timestamp of the offsets before this time (ms). max_number is the maximum number of offsets to retrieve. This additional information about the request must be used to describe the range of the messages.

The Message Object

Kafka message API is implemented by Kafka::Message class.

    if( $message->valid )
    {
        print "payload    : ", $message->payload,       "\n";
        print "offset     : ", $message->offset,        "\n";
        print "next_offset: ", $message->next_offset,   "\n";
    }
    else
    {
        print "error      : ", $message->error,         "\n";
    }

Available methods of Kafka::Message object are:

  • payload A simple message received from the Apache Kafka server.

  • valid A message entry is valid if the CRC32 of the message payload matches to the CRC stored with the message.

  • error A description of the message inconsistence (currently only for message is not valid or compressed).

  • offset The offset beginning of the message in the Apache Kafka server.

  • next_offset The offset beginning of the next message in the Apache Kafka server.

Common

Both Kafka::Producer and Kafka::Consumer objects described above also have the following common methods:

  • RaiseError is a method which causes Kafka to die if an error is detected.

  • last_errorcode and last_error diagnostic methods. Use them to get detailed error message if server or the resource might not be available, access to the resource might be denied, or other things might have failed for some reason.

  • close method: terminates connection with Kafka and clean up.

        my $producer = Kafka::Producer->new(
            IO          => $io,
            RaiseError  => 1
            );
    
        unless ( $producer->send( "test", 0, "Single message" )
        {
            print
                "error code       : ", $producer->last_errorcode,   "\n",
                "error description: ", $producer->last_error,       "\n";
        }
    
        $producer->close;

EXPORT

None by default.

Additional constants are available for import, which can be used to define some type of parameters, and to identify various error cases.

These are the defaults:

KAFKA_SERVER_PORT

default Apache Kafka server port - 9092.

DEFAULT_TIMEOUT

timeout in secs, for gethostbyname, connect, blocking receive and send calls (could be any integer or floating-point type) - 0.5 sec.

TIMESTAMP_LATEST

timestamp of the offsets before this time (ms) special value -1 : latest

TIMESTAMP_EARLIEST

timestamp of the offsets before this time (ms) special value -2 : earliest

DEFAULT_MAX_SIZE

maximum size of message(s) to receive - 1MB

DEFAULT_MAX_OFFSETS

maximum number of offsets to retrieve - 100

MAX_SOCKET_REQUEST_BYTES

The maximum size of a request that the socket server will accept (protection against OOM). Default limit (as configured in server.properties) is 104857600

Possible error codes returned by last_errorcode method (complies with an array of descriptions @Kafka::ERROR):

ERROR_INVALID_MESSAGE_CODE

0 - Invalid message

ERROR_MISMATCH_ARGUMENT

1 - Mismatch argument

ERROR_WRONG_CONNECT

2 - You must configure a host to connect to!

ERROR_CANNOT_SEND

3 - Can't send

ERROR_CANNOT_RECV

4 - Can't receive

ERROR_CANNOT_BIND

5 - Can't bind

ERROR_CHECKSUM_ERROR

6 - Checksum error

ERROR_COMPRESSED_PAYLOAD

7 - Compressed payload

ERROR_NUMBER_OF_OFFSETS

7 - Amount received offsets does not match 'NUMBER of OFFSETS'

ERROR_NOTHING_RECEIVE

8 - Nothing to receive

ERROR_IN_ERRORCODE

9 - Response contains an error in 'ERROR_CODE'

Support for working with 64 bit elements of the Kafka Wire Format protocol on 32 bit systems:

BITS64

Know you are working on 64 or 32 bit system

GLOBAL VARIABLES

@Kafka::ERROR

Contain the descriptions for possible error codes returned by last_errorcode methods and functions of the package modules.

%Kafka::ERROR_CODE

Contain the descriptions for possible error codes in the ERROR_CODE box of Apache Kafka Wire Format protocol responses.

An Example

    use Kafka qw(
        KAFKA_SERVER_PORT
        DEFAULT_TIMEOUT
        TIMESTAMP_EARLIEST
        DEFAULT_MAX_OFFSETS
        DEFAULT_MAX_SIZE
        );
    use Kafka::IO;
    use Kafka::Producer;
    use Kafka::Consumer;

    #-- IO
    my $io = Kafka::IO->new( host => "localhost" );

    #-- Producer
    my $producer = Kafka::Producer->new( IO => $io );

    # Sending a single message
    $producer->send(
        "test",             # topic
        0,                  # partition
        "Single message"    # message
        );

    # Sending a series of messages
    $producer->send(
        "test",             # topic
        0,                  # partition
        [                   # messages
            "The first message",
            "The second message",
            "The third message",
        ]
        );

    $producer->close;

    #-- Consumer
    my $consumer = Kafka::Consumer->new( IO => $io );

    # Get a list of valid offsets up max_number before the given time
    my $offsets;
    if ( $offsets = $consumer->offsets(
        "test",             # topic
        0,                  # partition
        TIMESTAMP_EARLIEST, # time
        DEFAULT_MAX_OFFSETS # max_number
        ) )
    {
        foreach my $offset ( @$offsets )
        {
            print "Received offset: $offset\n";
        }
    }
    if ( !$offsets or $consumer->last_error )
    {
        print
            "(", $consumer->last_errorcode, ") ",
            $consumer->last_error, "\n";
    }

    # Consuming messages
    if ( my $messages = $consumer->fetch(
        "test",             # topic
        0,                  # partition
        0,                  # offset
        DEFAULT_MAX_SIZE    # Maximum size of MESSAGE(s) to receive
        ) )
    {
        foreach my $message ( @$messages )
        {
            if( $message->valid )
            {
                print "payload    : ", $message->payload,       "\n";
                print "offset     : ", $message->offset,        "\n";
                print "next_offset: ", $message->next_offset,   "\n";
            }
            else
            {
                print "error      : ", $message->error,         "\n";
            }
        }
    }

    $consumer->close;

$io, $producer, and $consumer are created once when the application starts up.

DEPENDENCIES

In order to install and use this package you will need Perl version 5.10 or later. Some modules within this package depend on other packages that are distributed separately from Perl. We recommend that you have the following packages installed before you install Kafka:

    Params::Util
    String::CRC32

Kafka package has the following optional dependencies:

    Test::Pod
    Test::Pod::Coverage
    Test::Exception
    CPAN::Meta
    Test::Deep
    Test::Distribution
    Test::Kwalitee

If the optional modules are missing, some "prereq" tests are skipped.

BUGS AND LIMITATIONS

Currently, the package does not implement send and response of compressed messages. Also does not implement the MULTIFETCH and MULTIPRODUCE requests.

Use only one Kafka::Mock object at the same time (it has class variables for the exchange of TCP server processes).

The Kafka package was written, tested, and found working on recent Linux distributions.

There are no known bugs in this package.

Please report problems to the "AUTHOR".

Patches are welcome.

MORE DOCUMENTATION

All modules contain detailed information on the interfaces they provide.

SEE ALSO

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules

Kafka::IO - object interface to socket communications with the Apache Kafka server

Kafka::Producer - object interface to the producer client

Kafka::Consumer - object interface to the consumer client

Kafka::Message - object interface to the Kafka message properties

Kafka::Protocol - functions to process messages in the Apache Kafka's wire format

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems

Kafka::Mock - object interface to the TCP mock server for testing

A wealth of detail about the Apache Kafka and Wire Format:

Main page at http://incubator.apache.org/kafka/

Wire Format at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/

Writing a Driver for Kafka at http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka

AUTHOR

Sergey Gladkov, <sgladkov@trackingsoft.com>

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Vlad Marchenko

COPYRIGHT AND LICENSE

Copyright (C) 2012-2013 by TrackingSoft LLC. All rights reserved.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.