The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

Kafka::Protocol - functions to process messages in the Apache Kafka's 0.7 Wire Format

VERSION

This documentation refers to Kafka::Consumer version 0.12

SYNOPSIS

Setting up:

    #-- Export
    use Kafka::Protocol qw(
        DEFAULT_MAX_SIZE
        REQUESTTYPE_PRODUCE
        REQUESTTYPE_FETCH
        REQUESTTYPE_MULTIFETCH
        REQUESTTYPE_MULTIPRODUCE
        REQUESTTYPE_OFFSETS
        produce_request
        fetch_request
        offsets_request
        fetch_response
        offsets_response
        );

    print "REQUEST_TYPE(s):\n";
    print
        REQUESTTYPE_PRODUCE,        " ",
        REQUESTTYPE_FETCH           " ",
        REQUESTTYPE_MULTIFETCH      " ",
        REQUESTTYPE_MULTIPRODUCE    " ",
        REQUESTTYPE_OFFSETS         "\n";

    #-- declaration of variables to test
    my $topic       = "test";
    my $partition   = 0;
    my $single_message = "The first message";
    my $series_of_messages = [
        "The first message",
        "The second message",
        "The third message",
        ];
    my $offset      = 0;
    my $max_size    = DEFAULT_MAX_SIZE;
    my $time        = -2;
    my $max_number  = 100;
    my ( $str, $hsh_ref, $arr_ref );

Requests:

    #-- Producer request:
    $str = unpack( "H*",
        produce_request( $topic, $partition, $single_message );
    $str = unpack( "H*",
        produce_request( $topic, $partition, $series_of_messages );

    #-- Offsets request:
    $str = unpack( "H*",
        offsets_request( $topic, $partition, $time, $max_number );

    #-- Fetch request:
    $str = unpack( "H*",
        fetch_request( $topic, $partition, $offset, $max_size );

Responses (look at the Sample Data section of the Kafka::Mock module for a %responses example):

    #-- Offsets response
    $arr_ref = offsets_response( \$responses{4} );

    #-- Fetch response
    $hsh_ref = fetch_response( \$responses{1} );

An error:

    eval { fetch_response( [] ) };  # expecting to die
                                    # 'Mismatch argument'
    print STDERR
            "(", Kafka::Protocol::last_error(), ") ",
            $Kafka::Protocol::last_error(), "\n";

DESCRIPTION

When producing messages, the driver has to specify what topic and partition to send the message to. When requesting messages, the driver has to specify what topic, partition, and offset it wants them pulled from.

While you can request "old" messages if you know their topic, partition, and offset, Kafka does not have a message index. You cannot efficiently query Kafka for the N-1000th message, or ask for all messages written between 30 and 35 minutes ago.

The main features of the Kafka::Protocol module are:

  • Supports parsing the Apache Kafka Wire Format protocol.

  • Supports Apache Kafka Requests and Responses (PRODUCE and FETCH with no compression codec attribute now). Within this package we currently support access to PRODUCE Request, FETCH Request, OFFSETS Request, FETCH Response, OFFSETS Response.

  • Support for working with 64 bit elements of the Kafka Wire Format protocol on 32 bit systems.

FUNCTIONS

The following functions are available for Kafka::Protocol module.

  • offset, max_size or time, max_number are the additional information that might be encoded parameters of the messages we want to access.

produce_request( $topic, $partition, $messages )

Returns a binary PRODUCE request string coded according to the Apache Kafka Wire Format protocol, or error will cause the program to halt (confess) if the argument is not valid.

produce_request() takes arguments. The following arguments are currently recognized:

$topic

The $topic must be a normal non-false string of non-zero length.

$partition

The $partition must be a non-negative integer (of any length). That is, a positive integer, or zero.

$messages

The $messages is an arbitrary amount of data (a simple data string or a reference to an array of the data strings).

fetch_request( $topic, $partition, $offset, $max_size )

Returns a binary FETCH request string coded according to the Apache Kafka Wire Format protocol, or error will cause the program to halt (confess) if the argument is not valid.

fetch_request() takes arguments. The following arguments are currently recognized:

$topic

The $topic must be a normal non-false string of non-zero length.

$partition

The $partition must be a non-negative integer (of any length). That is, a positive integer, or zero.

$offset

Offset in topic and partition to start from (64 bits).

The argument must be a non-negative integer (of any length). That is, a positive integer, or zero. The argument may be a Math::BigInt integer on 32 bit system.

$max_size

$max_number is the maximum size of the message set to return. The argument must be a positive integer (of any length).

offsets_request( $topic, $partition, $time, $max_number )

Returns a binary OFFSETS request string coded according to the Apache Kafka Wire Format protocol, or error will cause the program to halt (confess) if the argument is not valid.

offsets_request() takes arguments. The following arguments are currently recognized:

$topic

The $topic must be a normal non-false string of non-zero length.

$partition

The $partition must be a non-negative integer (of any length). That is, a positive integer, or zero.

$time

$time is the timestamp of the offsets before this time - milliseconds since UNIX Epoch.

The argument must be a positive number. That is, it is defined and Perl thinks it's a number. The argument may be a Math::BigInt integer on 32 bit system.

The special values -1 (latest), -2 (earliest) are allowed.

$max_number

$max_number is the maximum number of offsets to retrieve. The argument must be a positive integer (of any length).

offsets_response( $response )

Decodes the argument and returns a reference to the hash representing the structure of the OFFSETS Response. Offsets are Math::BigInt integers on 32 bit system. Hash additionally comprises a pair of items {error} describing the possible error at line structure of the argument (now only "Amount received offsets does not match 'NUMBER of OFFSETS'" possible). Error will cause the program to halt (confess) if the argument is not valid.

offsets_response() takes arguments. The following arguments are currently recognized:

$response

$response is a reference to the OFFSETS Response buffer. The buffer must be a non-empty string 6+ bytes long.

fetch_response( $response )

Decodes the argument and returns a reference to the hash representing the structure of the FETCH Response. Error will cause the program to halt (confess) if the argument is not valid.

fetch_response() takes arguments. The following arguments are currently recognized:

$response

$response is a reference to the FETCH Response buffer. The buffer must be a non-empty string 6+ bytes long.

last_errorcode

This method returns an error code that specifies the position of the description in the @Kafka::ERROR array. Analysing this information can be done to determine the cause of the error.

The server or the resource might not be available, access to the resource might be denied or other things might have failed for some reason.

last_error

This method returns an error message that contains information about the encountered failure. Messages returned from this method may contain additional details and do not comply with the Kafka::ERROR array.

EXPORT

None by default.

It has an additional constants available for import, which can be used to define the module functions, and to identify REQUEST types (look at "SEE ALSO" section):

  • 0 - REQUESTTYPE_PRODUCE

  • 1 - REQUESTTYPE_FETCH

  • 2 - REQUESTTYPE_MULTIFETCH

  • 3 - REQUESTTYPE_MULTIPRODUCE

  • 4 - REQUESTTYPE_OFFSETS

DIAGNOSTICS

Kafka::Protocol is not a user module and any functions error is FATAL. FATAL errors will cause the program to halt (confess), since the problem is so severe that it would be dangerous to continue. (This can always be trapped with eval. Under the circumstances, dying is the best thing to do).

Mismatch argument

This means that you didn't give the right argument to some of functions.

Invalid message

This means that the array of messages contain a reference instead a simple data string.

For more error description, always look at the message from the "last_error" from the Kafka::Protocol::last_error function.

SEE ALSO

The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules

Kafka::IO - object interface to socket communications with the Apache Kafka server

Kafka::Producer - object interface to the producer client

Kafka::Consumer - object interface to the consumer client

Kafka::Message - object interface to the Kafka message properties

Kafka::Protocol - functions to process messages in the Apache Kafka's wire format

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems

Kafka::Mock - object interface to the TCP mock server for testing

A wealth of detail about the Apache Kafka and Wire Format:

Main page at http://incubator.apache.org/kafka/

Wire Format at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/

Writing a Driver for Kafka at http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka

AUTHOR

Sergey Gladkov, <sgladkov@trackingsoft.com>

CONTRIBUTORS

Alexander Solovey

Jeremy Jordan

Vlad Marchenko

COPYRIGHT AND LICENSE

Copyright (C) 2012-2013 by TrackingSoft LLC. All rights reserved.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.