Kafka::IO - object interface to socket communications with the Apache Kafka 0.7 server without using the Apache ZooKeeper
This documentation refers to Kafka::IO version 0.12
Kafka::IO
Setting up:
use Kafka qw( KAFKA_SERVER_PORT DEFAULT_TIMEOUT ); use Kafka::IO; my $io; eval { $io = Kafka::IO->new( host => "localhost", port => KAFKA_SERVER_PORT, timeout => "bad thing", RaiseError => 1 ) }; print "expecting to die: (", Kafka::IO::last_errorcode, ") ", Kafka::IO::last_error, "\n" if $@; unless ( $io = Kafka::IO->new( host => "localhost", port => KAFKA_SERVER_PORT, timeout => DEFAULT_TIMEOUT, # Optional, # default = DEFAULT_TIMEOUT RaiseError => 0 # Optional, default = 0 ) ) { print "unexpecting to die: (", Kafka::IO::last_errorcode, ") ", Kafka::IO::last_error, "\n" if $@; }
Producer:
use Kafka::Producer; my $producer = Kafka::Producer->new( IO => $io, RaiseError => 0 # Optional, default = 0 ); # ... the application body # Closes the producer and cleans up $producer->close;
Or Consumer:
use Kafka::Consumer; my $consumer = Kafka::Consumer->new( IO => $io, RaiseError => 0 # Optional, default = 0 ); # ... the application body # Closes the consumer and cleans up $consumer->close;
The main features of the Kafka::IO class are:
Provides an object oriented model of communication.
To provide the class that allows you to write the Apache Kafka 0.7 clients without using the Apache ZooKeeper service.
new
Establishes socket TCP connection on given host and port, creates a Kafka::IO IO object. Returns the created a Kafka::IO object.
An error will cause the program to halt or the constructor will return the undefined value, depending on the value of the RaiseError attribute.
RaiseError
You can use the methods of the Kafka::IO class - "last_errorcode" and "last_error" for the information about the error.
new() takes arguments in key-value pairs. The following arguments are currently recognized:
new()
host => $host
$host is an Apache Kafka host to connect to. It can be a hostname or the IP-address in the "xx.xx.xx.xx" form.
$host
port => $port
Optional, default = KAFKA_SERVER_PORT .
$port is the attribute denoting the port number of the service we want to access (Apache Kafka service). The $port should be a number.
$port
KAFKA_SERVER_PORT is the default Apache Kafka server port = 9092.
timeout => $timeout
Optional, default = DEFAULT_TIMEOUT .
DEFAULT_TIMEOUT is the default timeout that can be imported from the Kafka module.
$timeout specifies how much time we give remote server to respond before the IO object disconnects and creates an internal exception. The $timeout in secs, for gethostbyname, connect, blocking receive and send calls (could be any integer or floating-point type).
$timeout
The first connect will never fail with a timeout as the connect call will not block.
RaiseError => $mode
Optional, default = 0 .
An error will cause the program to halt if "RaiseError" is true: confess if the argument is not valid or die in the other error case. Returns the undefined value if "RaiseError" is not true and any error occured.
confess
die
The following methods are defined for the Kafka::IO class:
send( $message )
Sends a message on a Kafka::IO object socket. Reconnects on unconnected sockets.
The argument must be a bytes string.
Returns the number of characters sent. If there's an error, returns the undefined value if the "RaiseError" is not true.
receive( $length )
Receives a message on an IO object socket. Attempts to receive the $length bytes of data.
$length
Returns a reference to the received message. If there's an error, returns the undefined value if the "RaiseError" is not true.
The argument must be a value that is a positive number. That is, it is defined and Perl thinks it's a number.
close
The method to close the Kafka::IO object and clean up.
last_errorcode
This method returns an error code that specifies the position of the description in the @Kafka::ERROR array. Analysing this information can be done to determine the cause of the error.
@Kafka::ERROR
The server or the resource might not be available, access to the resource might be denied or other things might have failed for some reason.
Complies with an array of descriptions @Kafka::ERROR.
last_error
This method returns an error message that contains information about the encountered failure. Messages returned from this method may contain additional details and do not comply with the Kafka::ERROR array.
Kafka::ERROR
The method which causes the undefined value to be returned when an error is detected if "RaiseError" set to false, or to die automatically if "RaiseError" set to true (this can always be trapped with eval).
eval
It must be a non-negative integer. That is, a positive integer, or zero.
You should always check for errors, when not establishing the "RaiseError" mode to true.
Look at the RaiseError description for additional information on error handeling.
The methods for the possible error to analyse: "last_errorcode" and more descriptive "last_error".
Mismatch argument
This means that you didn't give the right argument to a new constructor or to other method.
Can't send
This means that the message can't be sent on a Kafka::IO object socket.
Can't recv
This means that the message can't be received on a Kafka::IO object socket.
Can't bind
This means that the socket TCP connection can't be established on on given host and port.
For more error description, always look at the message from "last_error" method or from Kafka::IO::last_error class method.
Kafka::IO::last_error
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules
Kafka::IO - object interface to socket communications with the Apache Kafka server
Kafka::Producer - object interface to the producer client
Kafka::Consumer - object interface to the consumer client
Kafka::Message - object interface to the Kafka message properties
Kafka::Protocol - functions to process messages in the Apache Kafka's wire format
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems
Kafka::Mock - object interface to the TCP mock server for testing
A wealth of detail about the Apache Kafka and Wire Format:
Main page at http://incubator.apache.org/kafka/
Wire Format at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/
Writing a Driver for Kafka at http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
Sergey Gladkov, <sgladkov@trackingsoft.com>
Alexander Solovey
Jeremy Jordan
Vlad Marchenko
Copyright (C) 2012-2013 by TrackingSoft LLC. All rights reserved.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
To install Kafka, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Kafka
CPAN shell
perl -MCPAN -e shell install Kafka
For more information on module installation, please visit the detailed CPAN module installation guide.