Kafka::Mock - object interface to the TCP mock server for testing
This documentation refers to Kafka::Mock version 0.12
Kafka::Mock
To use the Mock server, the application might be started and finished as below:
use Kafka qw( DEFAULT_TIMEOUT ); use Kafka::Mock; # Mock server my $mock = Kafka::Mock->new( requests => \%requests, responses => \%responses, timeout => 0.1 # Optional, secs float ); #-- IO my $io = Kafka::IO->new( host => "localhost", port => $mock->port ); # ... the application body # to kill the mock server process $mock->close;
Look at the "Sample Data" section for the %requests and %responses sample.
%requests
%responses
Use only one Kafka::Mock object at the same time.
This Kafka::Mock mock server provides a local TCP server with a set of behaviours for use in testing of Kafka clients with the Kafka::IO object. It is intended to be use when testing the Kafka::Producer and Kafka::Consumer client interfaces. Testing allows you to determine the settings for the Apache Kafka server instance and timeouts for your clients.
Kafka mock server API is implemented by Kafka::Mock class.
The main features of the Kafka::Mock class are:
Provides an object oriented model of communication.
Simple mock server instance for testing without an Apache Kafka server.
Allows you to set and clear the delay in these positions in the reception and transmission of data.
Provides diagnostic information on the commands received and fulfilled by the server.
new
Creates a local TCP server process. Establishes bidirectional communication to the server using socketpair. The server receives the hash references for the requests and appropriate responses to be returned. Also, the server receives the value of the delay between taking the requests.
socketpair
Returns the created a Kafka::Mock object, or error will cause the program to halt (confess) if the argument is not a valid.
confess
The Mock server returns to the IO object the response data that corresponds to the request from the %requests (based on the REQUEST_TYPE of the request).
new() takes arguments, these arguments are in key-value pairs. The following arguments are currently recognized:
new()
timeout => $timeout
Optional, default = DEFAULT_TIMEOUT .
DEFAULT_TIMEOUT is the default timeout that can be imported from the Kafka module.
$timeout specifies how much time we give remote server before it goes into the next reception of a request. The $timeout in secs (could be any integer or floating-point type).
$timeout
requests => \%requests
%requests is the reference to the hash denoting the control bytes strings of the requests.
responses => \%responses
%responses is the reference to the hash denoting the control bytes strings of the responses.
The keys of the %requests, and %responses must comply with the Apache Kafka request types:
0 - PRODUCE
1 - FETCH
2 - MULTIFETCH
3 - MULTIPRODUCE
4 - OFFSETS
The values of the %requests, and %responses should be in the "H*" format for the command pack.
"H*"
pack
The following methods are defined for the Kafka::Mock class:
port
The method returns the Mock server port to use for Kafka::IO object with "localhost" as his host attribute.
"localhost"
last_request
For the last query string of bytes received by the server (when invoked with no arguments). The resulting string can be used for comparison with the control passed to the query.
# received a last request my $last_request = $mock->last_request;
The following modes are used only for internal testing of the mock server:
# received a report on delays my $delays = $mock->last_request( "note" ); # report completed delays my $sleep = $server->last_request( "sleep" );
delay( $mode, $position, $delay )
$mock->delay( "request", 10, 0.5 ); $mock->delay( "response", 10, 0.5 );
To set a delay in the transmission or reception of data server.
To set the required number of delays should perform successive calls to delay.
delay
$mode should be set to "request" or "response" for the job delays in receipt of a request or response in the transmission, respectively.
$mode
"request"
"response"
$position specifies the byte position in the message will be followed by performed delay $delay seconds. $position must be a positive integer. That is, it is defined and Perl thinks it's an integer. For mode "request" argument to $position should be set not less than 6 as the request identifier should be holistic (REQUEST_LENGTH + REQUEST_TYPE).
$position
$delay
$delay must be a positive number. That is, it is defined and Perl thinks it's a number.
Error will cause the program to halt (confess) if an argument is not valid.
clear
The method to delete all previously specified delays.
close
The method to kill the mock server process and clean up.
my %requests = ( 0 => # PRODUCE Request # Request Header '0000005f' # REQUEST_LENGTH .'0000' # REQUEST_TYPE .'0004' # TOPIC_LENGTH .'74657374' # TOPIC ("test") .'00000000' # PARTITION # PRODUCE Request .'0000004f' # MESSAGES_LENGTH # MESSAGE .'00000016' # LENGTH .'00' # MAGIC .'' # COMPRESSION .'d94a22be' # CHECKSUM # "The first message" .'546865206669727374206d657373616765' # PAYLOAD # MESSAGE .'00000017' # LENGTH .'00' # MAGIC .'' # COMPRESSION .'a3810845' # CHECKSUM # "The second message" .'546865207365636f6e64206d657373616765' # PAYLOAD # MESSAGE .'00000016' # LENGTH .'00' # MAGIC .'' # COMPRESSION .'58611780' # CHECKSUM # "The third message" .'546865207468697264206d657373616765', # PAYLOAD 1 => # FETCH Request # Request Header '00000018' # REQUEST_LENGTH .'0001' # REQUEST_TYPE .'0004' # TOPIC_LENGTH .'74657374' # TOPIC ("test") .'00000000' # PARTITION # FETCH Request .'0000000000000000' # OFFSET .'00100000', # MAX_SIZE (1MB) 2 => '', # MULTIFETCH Request 3 => '', # MULTIPRODUCE Reqst 4 => # OFFSETS Request # Request Header '00000018' # REQUEST_LENGTH .'0004' # REQUEST_TYPE .'0004' # TOPIC_LENGTH .'74657374' # TOPIC ("test") .'00000000' # PARTITION # OFFSETS Request .'fffffffffffffffe' # TIME -2: earliest .'00000064', # MAX NUM OFFSTS 100 ); my %responses = ( 0 => '', # PRODUCE Response 1 => # FETCH Response # Response Header '00000051' # RESPONSE_LENGTH .'0000' # ERROR_CODE # MESSAGE .'00000016' # LENGTH .'00' # MAGIC .'' # COMPRESSION .'d94a22be' # CHECKSUM # "The first message" .'546865206669727374206d657373616765' # PAYLOAD # MESSAGE .'00000017' # LENGTH .'00' # MAGIC .'' # COMPRESSION .'a3810845' # CHECKSUM # "The second message" .'546865207365636f6e64206d657373616765' # PAYLOAD # MESSAGE .'00000016' # LENGTH .'00' # MAGIC .'' # COMPRESSION .'58611780' # CHECKSUM # "The third message" .'546865207468697264206d657373616765', # PAYLOAD 2 => '', # MULTIFETCH Respns 3 => '', # MULTIPROD Respns 4 => # OFFSETS Response # Response Header '0000000e' # RESPONSE_LENGTH .'0000' # ERROR_CODE # OFFSETS Response .'00000001' # NUMBER of OFFSETS .'0000000000000000' # OFFSET );
Kafka::Mock is not an end user module and any error is FATAL. FATAL errors will cause the program to halt (confess), since the problem is so severe that it would be dangerous to continue. (This can always be trapped with eval. Under the circumstances, dying is the best thing to do).
eval
Mismatch argument
This means that you didn't give the right argument to the new constructor.
When working with the Kafka::Mock module errors may occur are listed in the array @Kafka::Mock::ERROR.
@Kafka::Mock::ERROR
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules
Kafka::IO - object interface to socket communications with the Apache Kafka server
Kafka::Producer - object interface to the producer client
Kafka::Consumer - object interface to the consumer client
Kafka::Message - object interface to the Kafka message properties
Kafka::Protocol - functions to process messages in the Apache Kafka's wire format
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems
A wealth of detail about the Apache Kafka and Wire Format:
Main page at http://incubator.apache.org/kafka/
Wire Format at http://cwiki.apache.org/confluence/display/KAFKA/Wire+Format/
Writing a Driver for Kafka at http://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
Sergey Gladkov, <sgladkov@trackingsoft.com>
Alexander Solovey
Jeremy Jordan
Vlad Marchenko
Copyright (C) 2012-2013 by TrackingSoft LLC. All rights reserved.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
To install Kafka, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Kafka
CPAN shell
perl -MCPAN -e shell install Kafka
For more information on module installation, please visit the detailed CPAN module installation guide.