Commit a497d59a authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

change test tools to run in loop

parent 1fe628e7
......@@ -34,7 +34,7 @@ http {
client_body_buffer_size 1k;
ignore_invalid_headers on;
push_stream_shared_memory_size 10m;
push_stream_shared_memory_size 100m;
push_stream_max_channel_id_length 200;
# max messages to store in memory
push_stream_max_messages_stored_per_channel 20;
......@@ -81,7 +81,9 @@ http {
set $push_stream_channel_id $arg_id;
# store messages in memory
push_stream_store_messages on;
push_stream_store_messages off;
push_stream_keepalive on;
# Message size limit
# client_max_body_size MUST be equal to client_body_buffer_size or
......
......@@ -11,7 +11,7 @@ void write_message(Connection *connection, Statistics *stats);
void read_response(Connection *connection, Statistics *stats, char *buffer, int buffer_len);
int
main_program(int num_messages, int num_channels, int num_connections, const char *server_hostname, int server_port, int timeout)
main_program(int num_messages, int num_channels, int num_connections, int loop_interval, const char *server_hostname, int server_port, int timeout)
{
struct sockaddr_in server_address;
int main_sd = -1, num_events = 0, i, event_mask, channels_per_connection, num, start_time = 0, iters_to_next_summary = 0;
......@@ -120,12 +120,27 @@ main_program(int num_messages, int num_channels, int num_connections, const char
if (stats.messages >= (num_channels * num_messages)) {
summary("Connections=%ld, Messages=%ld BytesWritten=%ld Msg/Sec=%0.2f\n", stats.connections, stats.messages, stats.bytes_written, calc_message_per_second(stats.messages, start_time));
for (i = 0; i < num_connections; i++) {
close_connection(&connections[i]);
stats.connections--;
if (loop_interval != 0) {
stats.messages = 0;
for (i = 0; i < num_connections; i++) {
connections[i].message_count = 0;
connections[i].channel_id = -1;
if (change_connection(&connections[i], EPOLLIN | EPOLLHUP | EPOLLOUT) < 0) {
error2("Failed creating socket for connection = %d\n", connection->index);
}
}
sleep(loop_interval);
} else {
for (i = 0; i < num_connections; i++) {
close_connection(&connections[i]);
stats.connections--;
}
exitcode = EXIT_SUCCESS;
goto exit;
}
exitcode = EXIT_SUCCESS;
goto exit;
}
}
......@@ -192,7 +207,7 @@ read_response(Connection *connection, Statistics *stats, char *buffer, int buffe
bad_count += count_strinstr(buffer, "HTTP/1.1 5");
if (bad_count > 0) {
info("Recevied error. Buffer is %s\n", buffer);
info("Recevied error. Buffer is %s\n", buffer);
reopen_connection(connection);
return;
}
......@@ -209,6 +224,8 @@ main(int argc, char **argv)
struct arg_int *channels = arg_int0("c", "channels", "<n>", "define number of channels (default is 1)");
struct arg_int *publishers = arg_int0("p", "publishers", "<n>", "define number of publishers (default is 1)");
struct arg_int *loop_interval = arg_int0("l", "loop_interval", "<n>", "define the interval between loops in seconds. each loop send the specified number of messages (default is 0, not loop)");
struct arg_str *server_name = arg_str0("S", "server", "<hostname>", "server hostname where messages will be published (default is \"127.0.0.1\")");
struct arg_int *server_port = arg_int0("P", "port", "<n>", "server port where messages will be published (default is 9080)");
......@@ -219,7 +236,7 @@ main(int argc, char **argv)
struct arg_lit *version = arg_lit0(NULL, "version", "print version information and exit");
struct arg_end *end = arg_end(20);
void* argtable[] = { messages, channels, publishers, server_name, server_port, timeout, verbose, help, version, end };
void* argtable[] = { messages, channels, publishers, loop_interval, server_name, server_port, timeout, verbose, help, version, end };
const char* progname = "publisher";
int nerrors;
......@@ -237,6 +254,7 @@ main(int argc, char **argv)
messages->ival[0] = DEFAULT_NUM_MESSAGES;
publishers->ival[0] = DEFAULT_CONCURRENT_CONN;
channels->ival[0] = DEFAULT_NUM_CHANNELS;
loop_interval->ival[0] = 0;
server_name->sval[0] = DEFAULT_SERVER_HOSTNAME;
server_port->ival[0] = DEFAULT_SERVER_PORT;
timeout->ival[0] = DEFAULT_TIMEOUT;
......@@ -280,7 +298,7 @@ main(int argc, char **argv)
verbose_messages = verbose->ival[0];
/* normal case: take the command line options at face value */
exitcode = main_program(messages->ival[0], channels->ival[0], publishers->ival[0], server_name->sval[0], server_port->ival[0], timeout->ival[0]);
exitcode = main_program(messages->ival[0], channels->ival[0], publishers->ival[0], loop_interval->ival[0], server_name->sval[0], server_port->ival[0], timeout->ival[0]);
exit:
/* deallocate each non-null entry in argtable[] */
......
......@@ -61,6 +61,7 @@ main_program(int num_channels, int num_connections, const char *server_hostname,
debug("EPOLLHUP\n");
info("Server hung up on conncetion %d. Reconecting...\n", connection->index);
sleep(1);
stats.connections--;
reopen_connection(connection);
continue;
......@@ -69,6 +70,7 @@ main_program(int num_channels, int num_connections, const char *server_hostname,
if (event_mask & EPOLLERR) {
debug("EPOLLERR\n");
info("Server returned an error on connection %d. Reconecting...\n", connection->index);
stats.connections--;
reopen_connection(connection);
continue;
......@@ -110,18 +112,18 @@ main_program(int num_channels, int num_connections, const char *server_hostname,
}
if (stats.connections == 0) {
num = 0;
for (j = 0; j < num_connections; j++) {
if (connections[i].state != CLOSED) {
num++;
break;
}
}
if (num == 0) {
exitcode = EXIT_SUCCESS;
goto exit;
}
num = 0;
for (j = 0; j < num_connections; j++) {
if (connections[i].state != CLOSED) {
num++;
break;
}
}
if (num == 0) {
exitcode = EXIT_SUCCESS;
goto exit;
}
}
}
......@@ -147,6 +149,7 @@ subscribe_channels(Connection *connection, Statistics *stats)
len += sprintf(buffer + len, "?conn=%d HTTP/1.1\r\nHost: loadtest\r\n\r\n", connection->index);
if (write_connection(connection, stats, buffer, len) == EXIT_FAILURE) {
stats->connections--;
reopen_connection(connection);
return;
}
......@@ -162,6 +165,7 @@ read_response(Connection *connection, Statistics *stats, char *buffer, int buffe
if (bytes_read < 0) {
error("Error reading from socket for connection %d\n", connection->index);
stats->connections--;
reopen_connection(connection);
return;
}
......@@ -169,8 +173,8 @@ read_response(Connection *connection, Statistics *stats, char *buffer, int buffe
if (bytes_read == 0) { // server disconnected us
// reconnect
info("Server disconnected as requested %d.\n", connection->index);
close_connection(connection);
stats->connections--;
stats->connections--;
reopen_connection(connection);
return;
}
......@@ -183,7 +187,8 @@ read_response(Connection *connection, Statistics *stats, char *buffer, int buffe
bad_count += count_strinstr(buffer, "HTTP/1.1 5");
if (bad_count > 0) {
info("Recevied error. Buffer is %s\n", buffer);
info("Recevied error. Buffer is %s\n", buffer);
stats->connections--;
reopen_connection(connection);
return;
}
......@@ -192,13 +197,13 @@ read_response(Connection *connection, Statistics *stats, char *buffer, int buffe
stats->messages += msg_count;
if ((close_count = count_strinstr(buffer, "**CLOSE**")) > 0) {
connection->channel_count += close_count;
info("%d Channel(s) has(have) been closed by server.\n", close_count);
if (connection->channel_count >= (connection->channel_end - connection->channel_start + 1)) {
info("Connection %d will be closed \n", connection->index);
close_connection(connection);
stats->connections--;
}
connection->channel_count += close_count;
info("%d Channel(s) has(have) been closed by server.\n", close_count);
if (connection->channel_count >= (connection->channel_end - connection->channel_start + 1)) {
info("Connection %d will be closed \n", connection->index);
close_connection(connection);
stats->connections--;
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment