Commit 5bfe46a3 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

refactor on add message to channel to function ngx_http_push_stream_add_msg_to_channel

parent f8e7af9d
...@@ -211,7 +211,6 @@ ngx_event_t ngx_http_push_stream_buffer_cleanup_event; ...@@ -211,7 +211,6 @@ ngx_event_t ngx_http_push_stream_buffer_cleanup_event;
ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL; ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL;
// general request handling // general request handling
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool);
ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool); ngx_http_push_stream_msg_t *ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modified_time, ngx_int_t tag, ngx_pool_t *temp_pool); static void ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modified_time, ngx_int_t tag, ngx_pool_t *temp_pool);
static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value); static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
...@@ -230,6 +229,8 @@ static void ngx_http_push_stream_send_response_finalize_for_long ...@@ -230,6 +229,8 @@ static void ngx_http_push_stream_send_response_finalize_for_long
static ngx_int_t ngx_http_push_stream_memory_cleanup(); static ngx_int_t ngx_http_push_stream_memory_cleanup();
static ngx_int_t ngx_http_push_stream_buffer_cleanup(); static ngx_int_t ngx_http_push_stream_buffer_cleanup();
ngx_http_push_stream_channel_t *ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *data, size_t len, ngx_str_t *event_id, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev); static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev); static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_memory_cleanup_timer_wake_handler(ngx_event_t *ev); static void ngx_http_push_stream_memory_cleanup_timer_wake_handler(ngx_event_t *ev);
......
...@@ -129,13 +129,11 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -129,13 +129,11 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
ngx_str_t *id; ngx_str_t *id;
ngx_str_t *event_id; ngx_str_t *event_id;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_buf_t *buf = NULL; ngx_buf_t *buf = NULL;
ngx_chain_t *chain; ngx_chain_t *chain;
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ssize_t n; ssize_t n;
off_t len; off_t len;
ngx_http_push_stream_msg_t *msg;
// check if body message wasn't empty // check if body message wasn't empty
if (r->headers_in.content_length_n <= 0) { if (r->headers_in.content_length_n <= 0) {
...@@ -187,49 +185,12 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -187,49 +185,12 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID); event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID);
ngx_shmtx_lock(&shpool->mutex); channel = ngx_http_push_stream_add_msg_to_channel(r, id, buf->pos, ngx_buf_size(buf), event_id, r->pool);
// just find the channel. if it's not there, NULL and return error.
channel = ngx_http_push_stream_find_channel_locked(id, r->connection->log);
if (channel == NULL) { if (channel == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without created channel %s", id->data);
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return; return;
} }
// create a buffer copy in shared mem
msg = ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(buf, channel, channel->last_message_id + 1, event_id, r->pool);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR_LOCKED(msg, NULL, r, "push stream module: unable to allocate message in shared memory");
channel->last_message_id++;
((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->published_messages++;
// tag message with time stamp and a sequence tag
msg->time = ngx_time();
msg->tag = (msg->time == channel->last_message_time) ? (channel->last_message_tag + 1) : 0;
channel->last_message_time = msg->time;
channel->last_message_tag = msg->tag;
// set message expiration time
msg->expires = (ngx_http_push_stream_module_main_conf->message_ttl == NGX_CONF_UNSET ? 0 : (ngx_time() + ngx_http_push_stream_module_main_conf->message_ttl));
// put messages on the queue
if (cf->store_messages) {
ngx_queue_insert_tail(&channel->message_queue.queue, &msg->queue);
channel->stored_messages++;
// now see if the queue is too big
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, ngx_http_push_stream_module_main_conf->max_messages_stored_per_channel, 0);
}
ngx_shmtx_unlock(&shpool->mutex);
// send an alert to workers
ngx_http_push_stream_broadcast(channel, msg, r->connection->log);
// turn on timer to cleanup buffer of old messages
ngx_http_push_stream_buffer_cleanup_timer_set(cf);
ngx_http_push_stream_send_response_channel_info(r, channel); ngx_http_push_stream_send_response_channel_info(r, channel);
ngx_http_finalize_request(r, NGX_HTTP_OK); ngx_http_finalize_request(r, NGX_HTTP_OK);
return; return;
......
...@@ -144,13 +144,6 @@ ngx_http_push_stream_delete_worker_channel(void) ...@@ -144,13 +144,6 @@ ngx_http_push_stream_delete_worker_channel(void)
} }
ngx_http_push_stream_msg_t *
ngx_http_push_stream_convert_buffer_to_msg_on_shared_locked(ngx_buf_t *buf, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool)
{
return ngx_http_push_stream_convert_char_to_msg_on_shared_locked(buf->pos, ngx_buf_size(buf), channel, id, event_id, temp_pool);
}
ngx_http_push_stream_msg_t * ngx_http_push_stream_msg_t *
ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool) ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t len, ngx_http_push_stream_channel_t *channel, ngx_int_t id, ngx_str_t *event_id, ngx_pool_t *temp_pool)
{ {
...@@ -262,6 +255,64 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l ...@@ -262,6 +255,64 @@ ngx_http_push_stream_convert_char_to_msg_on_shared_locked(u_char *data, size_t l
} }
ngx_http_push_stream_channel_t *
ngx_http_push_stream_add_msg_to_channel(ngx_http_request_t *r, ngx_str_t *id, u_char *data, size_t len, ngx_str_t *event_id, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_msg_t *msg;
ngx_shmtx_lock(&shpool->mutex);
// just find the channel. if it's not there, NULL and return error.
channel = ngx_http_push_stream_find_channel_locked(id, r->connection->log);
if (channel == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without created channel %s", id->data);
return NULL;
}
// create a buffer copy in shared mem
msg = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(data, len, channel, channel->last_message_id + 1, event_id, temp_pool);
if (msg == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unable to allocate message in shared memory");
return NULL;
}
channel->last_message_id++;
((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->published_messages++;
// tag message with time stamp and a sequence tag
msg->time = ngx_time();
msg->tag = (msg->time == channel->last_message_time) ? (channel->last_message_tag + 1) : 0;
channel->last_message_time = msg->time;
channel->last_message_tag = msg->tag;
// set message expiration time
msg->expires = (ngx_http_push_stream_module_main_conf->message_ttl == NGX_CONF_UNSET ? 0 : (ngx_time() + ngx_http_push_stream_module_main_conf->message_ttl));
// put messages on the queue
if (cf->store_messages) {
ngx_queue_insert_tail(&channel->message_queue.queue, &msg->queue);
channel->stored_messages++;
// now see if the queue is too big
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, ngx_http_push_stream_module_main_conf->max_messages_stored_per_channel, 0);
}
ngx_shmtx_unlock(&shpool->mutex);
// send an alert to workers
ngx_http_push_stream_broadcast(channel, msg, r->connection->log);
// turn on timer to cleanup buffer of old messages
ngx_http_push_stream_buffer_cleanup_timer_set(cf);
return channel;
}
static ngx_int_t static ngx_int_t
ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status_code, const ngx_str_t *explain_error_message) ngx_http_push_stream_send_only_header_response(ngx_http_request_t *r, ngx_int_t status_code, const ngx_str_t *explain_error_message)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment