Commit a025280b authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

adding basic load test tools

parent d530418f
......@@ -5,3 +5,6 @@ build/
test/.bundle/
test/tmp/
misc/*.textile*
*.o
misc/tools/publisher
misc/tools/subscriber
all: publisher subscriber
subscriber: subscriber.o util.o
gcc -g -Oo subscriber.o util.o -o subscriber -largtable2
publisher: publisher.o util.o
gcc -g -Oo publisher.o util.o -o publisher -largtable2
subscriber.o: subscriber.c
gcc -g -c subscriber.c
publisher.o: publisher.c
gcc -g -c publisher.c
util.o: util.c
gcc -g -c util.c
clean:
rm -rf *o publisher subscriber
Copyright (C) 2011 Michael Costello, Wandenberg Peixoto <wandenberg@gmail.com>
These tools, publisher and subscriber, were developed only to do some load tests on push stream module.
Their use is very restricted and is not intended to cover all possible configuration for the module.
The first version was developed by Michael Costello and I made some improvements to distribute it.
Feel free to help continuous improvement.
Any feedbacks will be welcome.
=============
Requirements:
=============
lib argtable2
GCC, make, the usual guys
epoll event support
================
Developer Guide:
================
The basic configuration used on the load tests are listed bellow.
To compile the tools only execute a make.
To see all options use:
./publisher --help
./subscriber --help
Pay attention on default values to run your tests.
====================
Basic Configuration:
====================
pid logs/nginx.pid;
error_log logs/nginx-main_error.log debug;
worker_rlimit_core 500M;
working_directory /tmp/nginx;
worker_processes 2;
events {
worker_connections 1024;
use epoll;
}
http {
include mime.types;
default_type application/octet-stream;
access_log logs/nginx-http_access.log;
error_log logs/nginx-http_error.log debug;
push_stream_shared_memory_size 500M;
server {
listen 9080 default_server;
server_name localhost;
location /channels-stats {
push_stream_channels_statistics;
set $push_stream_channel_id $arg_id;
}
location /pub {
push_stream_publisher admin;
set $push_stream_channel_id $arg_id;
push_stream_store_messages off;
push_stream_keepalive on;
}
location ~ /sub/(.*) {
push_stream_subscriber;
set $push_stream_channels_path $1;
push_stream_message_template "~text~:~id~:~channel~";
}
}
}
/*
Copyright (C) 2011 Michael Costello, Wandenberg Peixoto <wandenberg@gmail.com>
Usage './publisher --help' to see option
*/
#include <argtable2.h>
#include "util.h"
void write_message(Connection *connection, Statistics *stats);
void read_response(Connection *connection, Statistics *stats, char *buffer, int buffer_len);
int
main_program(int num_messages, int num_channels, int num_connections, const char *server_hostname, int server_port, int timeout)
{
struct sockaddr_in server_address;
int main_sd = -1, num_events = 0, i, event_mask, channels_per_connection, num, start_time = 0, iters_to_next_summary = 0;
Connection *connections = NULL, *connection;
Statistics stats = {0,0,0,0,0};
int exitcode = EXIT_SUCCESS;
struct epoll_event events[MAX_EVENTS];
char buffer[BIG_BUFFER_SIZE];
info("Publisher starting up\n");
info("Publish: %d messages to %d channels on server: %s:%d\n", num_messages, num_channels, server_hostname, server_port);
if ((fill_server_address(server_hostname, server_port, &server_address)) != 0) {
error2("ERROR host name not found\n");
}
if ((main_sd = epoll_create(200 /* this size is not used on Linux kernel 2.6.8+ */)) < 0) {
error3("Failed %d creating main epoll socket\n", errno);
}
if ((connections = init_connections(num_connections, &server_address, main_sd)) == NULL) {
error2("Failed to create to connections\n");
}
stats.requested_connections = num_connections;
channels_per_connection = num_channels / num_connections;
for (i = 0; i < num_connections; i++) {
num = i * channels_per_connection;
connections[i].channel_start = num;
num += channels_per_connection - 1;
connections[i].channel_end = ((num > num_channels) || (i == (num_connections - 1))) ? num_channels - 1 : num;
}
// infinite loop
debug("Entering Infinite Loop\n");
iters_to_next_summary = ITERATIONS_TILL_SUMMARY_PER_TIMEOUT/timeout;
for(;;) {
if ((num_events = epoll_wait(main_sd, events, MAX_EVENTS, timeout)) < 0) {
error3("epoll_wait failed\n");
}
for (i = 0; i < num_events; i++) {
event_mask = events[i].events;
connection = (Connection *)(events[i].data.ptr);
if ((connection->message_count >= num_messages) && (connection->channel_id > connection->channel_end)) {
// remove write flag from event
if (change_connection(connection, EPOLLIN | EPOLLHUP) < 0) {
error2("Failed creating socket for connection = %d\n", connection->index);
}
if (event_mask & EPOLLOUT) { // WRITE
continue;
}
}
if (event_mask & EPOLLHUP) { // SERVER HUNG UP
debug("EPOLLHUP\n");
info("Server hung up on conncetion %d. Reconecting...\n", connection->index);
sleep(1);
reopen_connection(connection);
continue;
}
if (event_mask & EPOLLERR) {
debug("EPOLLERR\n");
info("Server returned an error on connection %d. Reconecting...\n", connection->index);
reopen_connection(connection);
continue;
}
if (event_mask & EPOLLIN) { // READ
debug("----------READ AVAILABLE-------\n");
if (connection->state == CONNECTED) {
read_response(connection, &stats, buffer, BIG_BUFFER_SIZE);
}
}
if (event_mask & EPOLLOUT) { // WRITE
debug("----------WRITE AVAILABLE-------\n");
if (start_time == 0) {
start_time = time(NULL);
}
if (connection->state == CONNECTING) {
connection->state = CONNECTED;
stats.connections++;
debug("Connection opened for index=%d\n", connection->index);
}
write_message(connection, &stats);
}
}
if (iters_to_next_summary-- <= 0) {
iters_to_next_summary = ITERATIONS_TILL_SUMMARY_PER_TIMEOUT/timeout;
summary("Connections=%ld, Messages=%ld BytesWritten=%ld Msg/Sec=%0.2f\n", stats.connections, stats.messages, stats.bytes_written, calc_message_per_second(stats.messages, start_time));
}
if (stats.messages >= (num_channels * num_messages)) {
summary("Connections=%ld, Messages=%ld BytesWritten=%ld Msg/Sec=%0.2f\n", stats.connections, stats.messages, stats.bytes_written, calc_message_per_second(stats.messages, start_time));
for (i = 0; i < num_connections; i++) {
close_connection(&connections[i]);
stats.connections--;
}
exitcode = EXIT_SUCCESS;
goto exit;
}
}
exit:
if (connections != NULL) free(connections);
return exitcode;
}
void
write_message(Connection *connection, Statistics *stats)
{
char buffer[BUFFER_SIZE];
int len = 0, bytes_written = 0;
if ((connection->channel_id <= connection->channel_start) || (connection->channel_id > connection->channel_end)) {
connection->channel_id = connection->channel_start;
connection->message_count++;
// gives a message payload of 140 bytes
connection->content_length = sprintf(connection->content_buffer, "**MSG** msg=%06d 012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789", connection->message_count);
connection->content_buffer[connection->content_length] = '\0';
}
len = sprintf(buffer, "POST /pub?id=my_channel_%ld HTTP/1.1\r\nHost: loadtest\r\nContent-Length: %d\r\n\r\n%s", connection->channel_id, connection->content_length, connection->content_buffer);
if (write_connection(connection, stats, buffer, len) == EXIT_FAILURE) {
reopen_connection(connection);
return;
}
connection->channel_id++;
}
void
read_response(Connection *connection, Statistics *stats, char *buffer, int buffer_len)
{
int bytes_read = 0, bad_count = 0, ok_count = 0;
bytes_read = read(connection->sd, buffer, buffer_len - 1);
if (bytes_read < 0) {
error("Error reading from socket for connection %d\n", connection->index);
reopen_connection(connection);
return;
}
if (bytes_read == 0) { // server disconnected us
// reconnect
info("Server disconnected as requested %d.\n", connection->index);
close_connection(connection);
return;
}
stats->bytes_read += bytes_read;
buffer[bytes_read] = '\0';
debug("Read %d bytes\n", bytes_read);
trace("Read Message: %s\n", buffer);
bad_count = count_strinstr(buffer, "HTTP/1.1 4");
bad_count += count_strinstr(buffer, "HTTP/1.1 5");
if (bad_count > 0) {
info("Recevied error. Buffer is %s\n", buffer);
reopen_connection(connection);
return;
}
ok_count = count_strinstr(buffer, "HTTP/1.1 200 OK");
stats->messages += ok_count;
}
int
main(int argc, char **argv)
{
struct arg_int *messages = arg_int0("m", "messages", "<n>", "define number of messages to publish in each channel (default is 1)");
struct arg_int *channels = arg_int0("c", "channels", "<n>", "define number of channels (default is 1)");
struct arg_int *publishers = arg_int0("p", "publishers", "<n>", "define number of publishers (default is 1)");
struct arg_str *server_name = arg_str0("S", "server", "<hostname>", "server hostname where messages will be published (default is \"127.0.0.1\")");
struct arg_int *server_port = arg_int0("P", "port", "<n>", "server port where messages will be published (default is 9080)");
struct arg_int *timeout = arg_int0(NULL, "timeout", "<n>", "timeout when waiting events on communication to the server (default is 1000)");
struct arg_int *verbose = arg_int0("v", "verbose", "<n>", "increase output messages detail (0 (default) - no messages, 1 - info messages, 2 - debug messages, 3 - trace messages");
struct arg_lit *help = arg_lit0(NULL, "help", "print this help and exit");
struct arg_lit *version = arg_lit0(NULL, "version", "print version information and exit");
struct arg_end *end = arg_end(20);
void* argtable[] = { messages, channels, publishers, server_name, server_port, timeout, verbose, help, version, end };
const char* progname = "publisher";
int nerrors;
int exitcode = EXIT_SUCCESS;
/* verify the argtable[] entries were allocated sucessfully */
if (arg_nullcheck(argtable) != 0) {
/* NULL entries were detected, some allocations must have failed */
printf("%s: insufficient memory\n", progname);
exitcode = EXIT_FAILURE;
goto exit;
}
/* set any command line default values prior to parsing */
messages->ival[0] = DEFAULT_NUM_MESSAGES;
publishers->ival[0] = DEFAULT_CONCURRENT_CONN;
channels->ival[0] = DEFAULT_NUM_CHANNELS;
server_name->sval[0] = DEFAULT_SERVER_HOSTNAME;
server_port->ival[0] = DEFAULT_SERVER_PORT;
timeout->ival[0] = DEFAULT_TIMEOUT;
verbose->ival[0] = 0;
/* Parse the command line as defined by argtable[] */
nerrors = arg_parse(argc, argv, argtable);
/* special case: '--help' takes precedence over error reporting */
if (help->count > 0) {
printf(DESCRIPTION_PUBLISHER, progname, VERSION, COPYRIGHT);
printf("Usage: %s", progname);
arg_print_syntax(stdout, argtable, "\n");
arg_print_glossary(stdout, argtable, " %-25s %s\n");
exitcode = EXIT_SUCCESS;
goto exit;
}
/* special case: '--version' takes precedence error reporting */
if (version->count > 0) {
printf(DESCRIPTION_PUBLISHER, progname, VERSION, COPYRIGHT);
exitcode = EXIT_SUCCESS;
goto exit;
}
if (publishers->ival[0] > channels->ival[0]) {
printf("Publisher number (%d) can not be greater than Channel number (%d).\n", publishers->ival[0], channels->ival[0]);
exitcode = EXIT_FAILURE;
goto exit;
}
/* If the parser returned any errors then display them and exit */
if (nerrors > 0) {
/* Display the error details contained in the arg_end struct.*/
arg_print_errors(stdout, end, progname);
printf("Try '%s --help' for more information.\n", progname);
exitcode = EXIT_FAILURE;
goto exit;
}
verbose_messages = verbose->ival[0];
/* normal case: take the command line options at face value */
exitcode = main_program(messages->ival[0], channels->ival[0], publishers->ival[0], server_name->sval[0], server_port->ival[0], timeout->ival[0]);
exit:
/* deallocate each non-null entry in argtable[] */
arg_freetable(argtable, sizeof(argtable) / sizeof(argtable[0]));
return exitcode;
}
/*
Copyright (C) 2011 Michael Costello, Wandenberg Peixoto <wandenberg@gmail.com>
Usage './subscriber --help' to see option
*/
#include <argtable2.h>
#include "util.h"
void subscribe_channels(Connection *connection, Statistics *stats);
void read_response(Connection *connection, Statistics *stats, char *buffer, int buffer_len);
int
main_program(int num_channels, int num_connections, const char *server_hostname, int server_port, int timeout)
{
struct sockaddr_in server_address;
int main_sd = -1, num_events = 0, i, j, event_mask, channels_per_connection, num, start_time = 0, iters_to_next_summary = 0;
Connection *connections = NULL, *connection;
Statistics stats = {0,0,0,0,0};
int exitcode = EXIT_SUCCESS;
struct epoll_event events[MAX_EVENTS];
char buffer[BIG_BUFFER_SIZE];
info("Subscriber starting up\n");
info("Subscriber: %d connections to %d channels on server: %s:%d\n", num_connections, num_channels, server_hostname, server_port);
if ((fill_server_address(server_hostname, server_port, &server_address)) != 0) {
error2("ERROR host name not found\n");
}
if ((main_sd = epoll_create(200 /* this size is not used on Linux kernel 2.6.8+ */)) < 0) {
error3("Failed %d creating main epoll socket\n", errno);
}
if ((connections = init_connections(num_connections, &server_address, main_sd)) == NULL) {
error2("Failed to create to connections\n");
}
stats.requested_connections = num_connections;
for (i = 0; i < num_connections; i++) {
connections[i].channel_start = 0;
connections[i].channel_end = num_channels - 1;
}
// infinite loop
debug("Entering Infinite Loop\n");
iters_to_next_summary = ITERATIONS_TILL_SUMMARY_PER_TIMEOUT/timeout;
for(;;) {
if ((num_events = epoll_wait(main_sd, events, MAX_EVENTS, timeout)) < 0) {
error3("epoll_wait failed\n");
}
for (i = 0; i < num_events; i++) {
event_mask = events[i].events;
connection = (Connection *)(events[i].data.ptr);
if (event_mask & EPOLLHUP) { // SERVER HUNG UP
debug("EPOLLHUP\n");
info("Server hung up on conncetion %d. Reconecting...\n", connection->index);
sleep(1);
reopen_connection(connection);
continue;
}
if (event_mask & EPOLLERR) {
debug("EPOLLERR\n");
info("Server returned an error on connection %d. Reconecting...\n", connection->index);
reopen_connection(connection);
continue;
}
if (event_mask & EPOLLIN) { // READ
debug("----------READ AVAILABLE-------\n");
if (connection->state == CONNECTED) {
read_response(connection, &stats, buffer, BIG_BUFFER_SIZE);
}
}
if (event_mask & EPOLLOUT) { // WRITE
debug("----------WRITE AVAILABLE-------\n");
if (start_time == 0) {
start_time = time(NULL);
}
if (connection->state == CONNECTING) {
connection->state = CONNECTED;
stats.connections++;
debug("Connection opened for index=%d\n", connection->index);
subscribe_channels(connection, &stats);
// remove write flag from event
if (change_connection(connection, EPOLLIN | EPOLLHUP) < 0) {
error2("Failed creating socket for connection = %d\n", connection->index);
}
}
}
}
if ((iters_to_next_summary-- <= 0)) {
iters_to_next_summary = ITERATIONS_TILL_SUMMARY_PER_TIMEOUT/timeout;
summary("Connections=%ld, Messages=%ld BytesRead=%ld Msg/Sec=%0.2f\n", stats.connections, stats.messages, stats.bytes_read, calc_message_per_second(stats.messages, start_time));
}
if (stats.connections == 0) {
num = 0;
for (j = 0; j < num_connections; j++) {
if (connections[i].state != CLOSED) {
num++;
break;
}
}
if (num == 0) {
exitcode = EXIT_SUCCESS;
goto exit;
}
}
}
exit:
if (connections != NULL) free(connections);
return exitcode;
}
void
subscribe_channels(Connection *connection, Statistics *stats)
{
char buffer[BUFFER_SIZE];
int len = 0, bytes_written = 0;
long i = 0;
len = sprintf(buffer, "GET /sub");
for (i = connection->channel_start; i <= connection->channel_end; i++) {
len += sprintf(buffer + len, "/my_channel_%ld", i);
}
len += sprintf(buffer + len, " HTTP/1.1\r\nHost: loadtest\r\n\r\n");
if (write_connection(connection, stats, buffer, len) == EXIT_FAILURE) {
reopen_connection(connection);
return;
}
}
void
read_response(Connection *connection, Statistics *stats, char *buffer, int buffer_len)
{
int bytes_read = 0, bad_count = 0, msg_count = 0, close_count = 0;
bytes_read = read(connection->sd, buffer, buffer_len - 1);
if (bytes_read < 0) {
error("Error reading from socket for connection %d\n", connection->index);
reopen_connection(connection);
return;
}
if (bytes_read == 0) { // server disconnected us
// reconnect
info("Server disconnected as requested %d.\n", connection->index);
close_connection(connection);
stats->connections--;
return;
}
stats->bytes_read += bytes_read;
buffer[bytes_read] = '\0';
debug("Read %d bytes\n", bytes_read);
trace("Read Message: %s\n", buffer);
bad_count = count_strinstr(buffer, "HTTP/1.1 4");
bad_count += count_strinstr(buffer, "HTTP/1.1 5");
if (bad_count > 0) {
info("Recevied error. Buffer is %s\n", buffer);
reopen_connection(connection);
return;
}
msg_count = count_strinstr(buffer, "**MSG**");
stats->messages += msg_count;
if ((close_count = count_strinstr(buffer, "**CLOSE**")) > 0) {
connection->channel_count += close_count;
info("%d Channel(s) has(have) been closed by server.\n", close_count);
if (connection->channel_count >= (connection->channel_end - connection->channel_start + 1)) {
info("Connection %d will be closed \n", connection->index);
close_connection(connection);
stats->connections--;
}
}
}
int
main(int argc, char **argv)
{
struct arg_int *channels = arg_int0("c", "channels", "<n>", "define number of channels (default is 1)");
struct arg_int *subscribers = arg_int0("s", "subscribers", "<n>", "define number of subscribers (default is 1)");
struct arg_str *server_name = arg_str0("S", "server", "<hostname>", "server hostname where messages will be published (default is \"127.0.0.1\")");
struct arg_int *server_port = arg_int0("P", "port", "<n>", "server port where messages will be published (default is 9080)");
struct arg_int *timeout = arg_int0(NULL, "timeout", "<n>", "timeout when waiting events on communication to the server (default is 1000)");
struct arg_int *verbose = arg_int0("v", "verbose", "<n>", "increase output messages detail (0 (default) - no messages, 1 - info messages, 2 - debug messages, 3 - trace messages");
struct arg_lit *help = arg_lit0(NULL, "help", "print this help and exit");
struct arg_lit *version = arg_lit0(NULL, "version", "print version information and exit");
struct arg_end *end = arg_end(20);
void* argtable[] = { channels, subscribers, server_name, server_port, timeout, verbose, help, version, end };
const char* progname = "subscriber";
int nerrors;
int exitcode = EXIT_SUCCESS;
/* verify the argtable[] entries were allocated sucessfully */
if (arg_nullcheck(argtable) != 0) {
/* NULL entries were detected, some allocations must have failed */
printf("%s: insufficient memory\n", progname);
exitcode = EXIT_FAILURE;
goto exit;
}
/* set any command line default values prior to parsing */
subscribers->ival[0] = DEFAULT_CONCURRENT_CONN;
channels->ival[0] = DEFAULT_NUM_CHANNELS;
server_name->sval[0] = DEFAULT_SERVER_HOSTNAME;
server_port->ival[0] = DEFAULT_SERVER_PORT;
timeout->ival[0] = DEFAULT_TIMEOUT;
verbose->ival[0] = 0;
/* Parse the command line as defined by argtable[] */
nerrors = arg_parse(argc, argv, argtable);
/* special case: '--help' takes precedence over error reporting */
if (help->count > 0) {
printf(DESCRIPTION_SUBSCRIBER, progname, VERSION, COPYRIGHT);
printf("Usage: %s", progname);
arg_print_syntax(stdout, argtable, "\n");
arg_print_glossary(stdout, argtable, " %-25s %s\n");
exitcode = EXIT_SUCCESS;
goto exit;
}
/* special case: '--version' takes precedence error reporting */
if (version->count > 0) {
printf(DESCRIPTION_SUBSCRIBER, progname, VERSION, COPYRIGHT);
exitcode = EXIT_SUCCESS;
goto exit;
}
/* If the parser returned any errors then display them and exit */
if (nerrors > 0) {
/* Display the error details contained in the arg_end struct.*/
arg_print_errors(stdout, end, progname);
printf("Try '%s --help' for more information.\n", progname);
exitcode = EXIT_FAILURE;
goto exit;
}
verbose_messages = verbose->ival[0];
/* normal case: take the command line options at face value */
exitcode = main_program(channels->ival[0], subscribers->ival[0], server_name->sval[0], server_port->ival[0], timeout->ival[0]);
exit:
/* deallocate each non-null entry in argtable[] */
arg_freetable(argtable, sizeof(argtable) / sizeof(argtable[0]));
return exitcode;
}
#include "util.h"
int
fill_server_address(const char *server_hostname, int server_port, struct sockaddr_in *server_address)
{
struct hostent *server = NULL;
if ((server = gethostbyname(server_hostname)) == NULL) {
return EXIT_FAILURE;
}
bzero((char *) server_address, sizeof(struct sockaddr_in));
server_address->sin_family = AF_INET;
memcpy((char *) &server_address->sin_addr.s_addr, (const char *) server->h_addr, server->h_length);
server_address->sin_port = htons(server_port);
return EXIT_SUCCESS;
}
Connection *
init_connections(int num_connections, struct sockaddr_in *server_address, int main_sd)
{
Connection *connections;
int i;
if ((connections = (Connection *) malloc(sizeof(Connection) * num_connections)) == NULL) {
return NULL;
}
for (i = 0; i < num_connections; ++i) {
connections[i].index = i;
connections[i].server_address = server_address;
connections[i].main_sd = main_sd;
if (open_connection(&connections[i]) != 0) {
error("Opening connection %d\n", i);
return NULL;
}
}
info("Added %d connections.\n", num_connections);
return connections;
}
int
open_connection(Connection *connection)
{
struct epoll_event anEvent;
int exitcode = EXIT_SUCCESS;
connection->state = CONNECTING;
connection->channel_id = -1;
connection->content_length = 0;
connection->channel_count = 0;
if ((connection->sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
error3("ERROR %d opening socket for connection %d\n", errno, connection->index);
}
// // set nonblocking
// int flags = fcntl(connection->sd, F_GETFL, 0);
// fcntl(connection->sd, F_SETFL, flags | O_NONBLOCK);
//
// int rc = connect(connection->sd, (struct sockaddr *) connection->server_address, sizeof(struct sockaddr_in));
// if ((rc < 0) && (errno != EINPROGRESS)) {
// error3("ERROR connecting to server on connection %d\n", connection->index);
// }
if (connect(connection->sd, (struct sockaddr *) connection->server_address, sizeof(struct sockaddr_in)) < 0) {
error3("ERROR connecting to server on connection %d\n", connection->index);
}
debug("Adding connection %d\n", connection->index);
anEvent.events = EPOLLIN | EPOLLOUT | EPOLLHUP;
anEvent.data.ptr = (void *) connection;
if (epoll_ctl(connection->main_sd, EPOLL_CTL_ADD, connection->sd, &anEvent) < 0) {
error3("ERROR %d Failed creating socket for connection %d\n", errno, connection->index);
}
debug("Connection opening for index %d\n", connection->index);
exit:
return exitcode;
}
void
close_connection(Connection *connection)
{
connection->state = CLOSED;
close(connection->sd);
}
int
reopen_connection(Connection *connection)
{
close_connection(connection);
return open_connection(connection);
}
int
change_connection(Connection *connection, uint32_t events)
{
struct epoll_event anEvent;
anEvent.events = events;
anEvent.data.ptr = (void *) connection;
return epoll_ctl(connection->main_sd, EPOLL_CTL_MOD, connection->sd, &anEvent);
}
int
write_connection(Connection *connection, Statistics *stats, char *buffer, int buffer_len)
{
int bytes_written = 0;
bytes_written = write(connection->sd, buffer, buffer_len);
if (bytes_written != buffer_len) {
error4("Error %d writing bytes (wrote=%d, wanted=%d) for connection %d\n", errno, bytes_written, buffer_len, connection->index);
return EXIT_FAILURE;
}
stats->bytes_written += bytes_written;
trace("Wrote %s\n", buffer);
return EXIT_SUCCESS;
}
float
calc_message_per_second(int num_messages, int start_time)
{
float ret_val = 0.0;
int now = time(NULL);
int diff = now - start_time;
if (diff == 0) {
diff = 1;
}
ret_val = (float) num_messages/diff;
info("CALC TIME. Messages=%d, Time=%d Avg=%0.2f\n", num_messages, diff, ret_val);
return ret_val;
}
int
count_strinstr(const char *big, const char *little)
{
const char *p;
int count = 0;
size_t lil_len = strlen(little);
/* you decide what to do here */
if (lil_len == 0)
return -1;
p = strstr(big, little);
while (p) {
count++;
p = strstr(p + lil_len, little);
}
return count;
}
#ifndef _UTIL_H_
#define _UTIL_H_
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#define error(...) {fprintf(stderr, __VA_ARGS__);}
#define error2(...) {error(__VA_ARGS__); exitcode = EXIT_FAILURE; goto exit;}
#define error3(...) {perror("ERROR");error(__VA_ARGS__); exitcode = EXIT_FAILURE; goto exit;}
#define error4(...) {perror("ERROR");error(__VA_ARGS__);}
#define info(...) {if (verbose_messages >= 1) {fprintf(stdout, __VA_ARGS__);}}
#define debug(...) {if (verbose_messages >= 2) {fprintf(stdout, __VA_ARGS__);}}
#define trace(...) {if (verbose_messages >= 3) {fprintf(stdout, __VA_ARGS__);}}
#define summary(...) fprintf(stdout, __VA_ARGS__)
#define VERSION "0.1"
#define COPYRIGHT "Copyright (C) 2011 Michael Costello, Wandenberg Peixoto <wandenberg@gmail.com>"
#define DESCRIPTION_PUBLISHER "'%s' v%s - program to publish messages to test Push Stream Module.\n%s\n"
#define DESCRIPTION_SUBSCRIBER "'%s' v%s - program to subscribe channels to test Push Stream Module.\n%s\n"
#define DEFAULT_NUM_MESSAGES 1
#define DEFAULT_CONCURRENT_CONN 1
#define DEFAULT_NUM_CHANNELS 1
#define DEFAULT_SERVER_HOSTNAME "127.0.0.1"
#define DEFAULT_SERVER_PORT 9080
#define DEFAULT_TIMEOUT 1000
#define MAX_EVENTS (60000 * 8)
#define ITERATIONS_TILL_SUMMARY_PER_TIMEOUT 10000 //timeout: 1000 -> summary each 10 seconds
#define BUFFER_SIZE 1024
#define BIG_BUFFER_SIZE 640000
typedef struct
{
long requested_connections;
long connections;
long messages;
long bytes_written;
long bytes_read;
} Statistics;
enum State {INIT=0, CONNECTING, CONNECTED, CLOSED};
// store per connection state here
typedef struct
{
int index;
int main_sd;
int sd;
int message_count;
int num_messages;
int channel_count;
long channel_id;
long channel_start;
long channel_end;
char content_buffer[BUFFER_SIZE];
int content_length;
enum State state;
struct sockaddr_in *server_address;
} Connection;
static int verbose_messages = 0;
int fill_server_address(const char *server_hostname, int server_port, struct sockaddr_in *server_address);
Connection *init_connections(int count, struct sockaddr_in *server_address, int main_sd);
int open_connection(Connection *connection);
void close_connection(Connection *connection);
int reopen_connection(Connection *connection);
int write_connection(Connection *connection, Statistics *stats, char *buffer, int buffer_len);
int change_connection(Connection *connection, uint32_t events);
float calc_message_per_second(int num_messages, int start_time);
#endif /* _UTIL_H_ */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment