Commit 211edf51 authored by Wandenberg Peixoto's avatar Wandenberg Peixoto Committed by Wandenberg

added the feature to send a custom 'channel delete message' on the body of the DELETE request

parent b75a4baa
h1(#changelog). Changelog h1(#changelog). Changelog
* Added the feature to send a custom 'channel delete message' on the body of the DELETE request
* Changed push_stream_channel_id variable to directive, and make possible set it inside an if block * Changed push_stream_channel_id variable to directive, and make possible set it inside an if block
* Changed push_stream_channels_path variable to directive, and make possible set it inside an if block * Changed push_stream_channels_path variable to directive, and make possible set it inside an if block
* Back to use Nginx chunked filter * Back to use Nginx chunked filter
......
...@@ -31,5 +31,6 @@ ...@@ -31,5 +31,6 @@
static ngx_int_t ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r); static ngx_int_t ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_publisher_handler(ngx_http_request_t *r); static ngx_int_t ngx_http_push_stream_publisher_handler(ngx_http_request_t *r);
static void ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r); static void ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r);
static void ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r);
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_PUBLISHER_H_ */ #endif /* NGX_HTTP_PUSH_STREAM_MODULE_PUBLISHER_H_ */
...@@ -267,7 +267,7 @@ static void ngx_http_push_stream_worker_subscriber_cleanup_locke ...@@ -267,7 +267,7 @@ static void ngx_http_push_stream_worker_subscriber_cleanup_locke
static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len); static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, uint len);
static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg); static void ngx_http_push_stream_mark_message_to_delete_locked(ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool); static void ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool);
static void ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force); static void ngx_http_push_stream_collect_expired_messages(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force);
static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force); static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool, ngx_flag_t force);
static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg); static void ngx_http_push_stream_free_message_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg);
......
...@@ -562,6 +562,60 @@ describe "Publisher Properties" do ...@@ -562,6 +562,60 @@ describe "Publisher Properties" do
end end
end end
it "should delete a channel with a custom message" do
channel = 'test_delete_channel_whith_subscriber_in_one_channel'
body = 'published message'
configuration = config.merge({
:header_template => " ", # send a space as header to has a chunk received
:footer_template => nil,
:ping_message_interval => nil,
:message_template => '{\"id\":\"~id~\", \"channel\":\"~channel~\", \"text\":\"~text~\"}'
})
resp = ""
nginx_run_server(configuration, :timeout => 5) do |conf|
EventMachine.run do
sub_1 = EventMachine::HttpRequest.new(nginx_address + '/sub/' + channel.to_s).get :head => headers, :timeout => 30
sub_1.stream do |chunk|
resp = resp + chunk
if resp.strip.empty?
stats = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => {'accept' => 'application/json'}, :timeout => 30
stats.callback do
stats.response_header.status.should eql(200)
stats.response_header.content_length.should_not eql(0)
response = JSON.parse(stats.response)
response["subscribers"].to_i.should eql(1)
response["channels"].to_i.should eql(1)
pub = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).delete :head => headers, :body => "custom channel delete message", :timeout => 30
pub.callback do
pub.response_header.status.should eql(200)
pub.response_header.content_length.should eql(0)
pub.response_header['X_NGINX_PUSHSTREAM_EXPLAIN'].should eql("Channel deleted.")
end
end
else
response = JSON.parse(resp)
response["channel"].should eql(channel)
response["id"].to_i.should eql(-2)
response["text"].should eql("custom channel delete message")
stats = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => {'accept' => 'application/json'}, :timeout => 30
stats.callback do
stats.response_header.status.should eql(200)
stats.response_header.content_length.should_not eql(0)
response = JSON.parse(stats.response)
response["subscribers"].to_i.should eql(0)
response["channels"].to_i.should eql(0)
end
EventMachine.stop
end
end
end
end
end
it "should delete a channel with subscriber in two channels" do it "should delete a channel with subscriber in two channels" do
channel_1 = 'test_delete_channel_whith_subscriber_in_two_channels_1' channel_1 = 'test_delete_channel_whith_subscriber_in_two_channels_1'
channel_2 = 'test_delete_channel_whith_subscriber_in_two_channels_2' channel_2 = 'test_delete_channel_whith_subscriber_in_two_channels_2'
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include <ngx_http_push_stream_module_version.h> #include <ngx_http_push_stream_module_version.h>
static ngx_int_t ngx_http_push_stream_publisher_handle_post(ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_str_t *id); static ngx_int_t ngx_http_push_stream_publisher_handle_post(ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_str_t *id);
static ngx_int_t ngx_http_push_stream_publisher_handle_delete(ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_str_t *id);
static ngx_int_t static ngx_int_t
ngx_http_push_stream_publisher_handler(ngx_http_request_t *r) ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
...@@ -80,13 +81,34 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r) ...@@ -80,13 +81,34 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
} }
if ((cf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) && (r->method == NGX_HTTP_DELETE)) { if ((cf->location_type == NGX_HTTP_PUSH_STREAM_PUBLISHER_MODE_ADMIN) && (r->method == NGX_HTTP_DELETE)) {
ngx_http_push_stream_delete_channel(id, r->pool); return ngx_http_push_stream_publisher_handle_delete(cf, r, id);
return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED);
} }
return ngx_http_push_stream_send_response_channel_info(r, channel); return ngx_http_push_stream_send_response_channel_info(r, channel);
} }
static ngx_int_t
ngx_http_push_stream_publisher_handle_delete(ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_str_t *id)
{
ngx_int_t rc;
/*
* Instruct ngx_http_read_subscriber_request_body to store the request
* body entirely in a memory buffer or in a file.
*/
r->request_body_in_single_buf = 0;
r->request_body_in_persistent_file = 1;
r->request_body_in_clean_file = 0;
r->request_body_file_log_level = 0;
// parse the body message and return
rc = ngx_http_read_client_request_body(r, ngx_http_push_stream_publisher_delete_handler);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
return NGX_DONE;
}
static ngx_int_t static ngx_int_t
ngx_http_push_stream_publisher_handle_post(ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_str_t *id) ngx_http_push_stream_publisher_handle_post(ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_str_t *id)
...@@ -129,37 +151,16 @@ ngx_http_push_stream_publisher_handle_post(ngx_http_push_stream_loc_conf_t *cf, ...@@ -129,37 +151,16 @@ ngx_http_push_stream_publisher_handle_post(ngx_http_push_stream_loc_conf_t *cf,
return NGX_DONE; return NGX_DONE;
} }
static void static ngx_buf_t *
ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ngx_http_push_stream_read_request_body_to_buffer(ngx_http_request_t *r)
{ {
ngx_str_t *id;
ngx_str_t *event_id, *event_type;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_buf_t *buf = NULL; ngx_buf_t *buf = NULL;
ngx_chain_t *chain; ngx_chain_t *chain;
ngx_http_push_stream_channel_t *channel;
ssize_t n; ssize_t n;
off_t len; off_t len;
// check if body message wasn't empty
if (r->headers_in.content_length_n <= 0) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: Post request was sent with no message");
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_EMPTY_POST_REQUEST_MESSAGE);
return;
}
// get and check if has access to request body
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(r->request_body->bufs, NULL, r, "push stream module: unexpected publisher message request body buffer location. please report this to the push stream module developers.");
// get and check channel id value
id = ngx_http_push_stream_get_channel_id(r, cf);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NULL, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id");
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id");
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler with channel id too large");
// copy request body to a memory buffer
buf = ngx_create_temp_buf(r->pool, r->headers_in.content_length_n + 1); buf = ngx_create_temp_buf(r->pool, r->headers_in.content_length_n + 1);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message"); if (buf != NULL) {
ngx_memset(buf->start, '\0', r->headers_in.content_length_n + 1); ngx_memset(buf->start, '\0', r->headers_in.content_length_n + 1);
chain = r->request_body->bufs; chain = r->request_body->bufs;
...@@ -176,8 +177,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -176,8 +177,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
n = ngx_read_file(chain->buf->file, buf->start, len, 0); n = ngx_read_file(chain->buf->file, buf->start, len, 0);
if (n == NGX_FILE_ERROR) { if (n == NGX_FILE_ERROR) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: cannot read file with request body"); ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: cannot read file with request body");
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return NULL;
return;
} }
buf->last = buf->last + len; buf->last = buf->last + len;
ngx_delete_file(chain->buf->file->name.data); ngx_delete_file(chain->buf->file->name.data);
...@@ -189,6 +189,69 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r) ...@@ -189,6 +189,69 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
chain = chain->next; chain = chain->next;
buf->start = buf->last; buf->start = buf->last;
} }
}
return buf;
}
static void
ngx_http_push_stream_publisher_delete_handler(ngx_http_request_t *r)
{
ngx_str_t *id;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_buf_t *buf = NULL;
u_char *text = ngx_http_push_stream_module_main_conf->channel_deleted_message_text.data;
size_t len = ngx_http_push_stream_module_main_conf->channel_deleted_message_text.len;
if (r->headers_in.content_length_n > 0) {
// get and check if has access to request body
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(r->request_body->bufs, NULL, r, "push stream module: unexpected publisher message request body buffer location. please report this to the push stream module developers.");
buf = ngx_http_push_stream_read_request_body_to_buffer(r);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message");
text = buf->pos;
len = ngx_buf_size(buf);
}
// get and check channel id value
id = ngx_http_push_stream_get_channel_id(r, cf);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NULL, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id");
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id");
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler with channel id too large");
ngx_http_push_stream_delete_channel(id, text, len, r->pool);
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_OK, &NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED);
}
static void
ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
{
ngx_str_t *id;
ngx_str_t *event_id, *event_type;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_buf_t *buf = NULL;
ngx_http_push_stream_channel_t *channel;
// check if body message wasn't empty
if (r->headers_in.content_length_n <= 0) {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: Post request was sent with no message");
ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_BAD_REQUEST, &NGX_HTTP_PUSH_STREAM_EMPTY_POST_REQUEST_MESSAGE);
return;
}
// get and check if has access to request body
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(r->request_body->bufs, NULL, r, "push stream module: unexpected publisher message request body buffer location. please report this to the push stream module developers.");
// get and check channel id value
id = ngx_http_push_stream_get_channel_id(r, cf);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NULL, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id");
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_UNSET_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler without channel id");
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(id, NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID, r, "push stream module: something goes very wrong, arrived on ngx_http_push_stream_publisher_body_handler with channel id too large");
// copy request body to a memory buffer
buf = ngx_http_push_stream_read_request_body_to_buffer(r);
NGX_HTTP_PUSH_STREAM_CHECK_AND_FINALIZE_REQUEST_ON_ERROR(buf, NULL, r, "push stream module: cannot allocate memory for read the message");
event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID); event_id = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_ID);
event_type = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_TYPE); event_type = ngx_http_push_stream_get_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_EVENT_TYPE);
......
...@@ -648,7 +648,7 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_ ...@@ -648,7 +648,7 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_
} }
static void static void
ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool) ngx_http_push_stream_delete_channel(ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool)
{ {
ngx_http_push_stream_channel_t *channel; ngx_http_push_stream_channel_t *channel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr; ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
...@@ -675,7 +675,7 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool) ...@@ -675,7 +675,7 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, 0, 0); ngx_http_push_stream_ensure_qtd_of_messages_locked(channel, 0, 0);
// apply channel deleted message text to message template // apply channel deleted message text to message template
if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(ngx_http_push_stream_module_main_conf->channel_deleted_message_text.data, ngx_http_push_stream_module_main_conf->channel_deleted_message_text.len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, NULL, NULL, temp_pool)) == NULL) { if ((channel->channel_deleted_message = ngx_http_push_stream_convert_char_to_msg_on_shared_locked(text, len, channel, NGX_HTTP_PUSH_STREAM_CHANNEL_DELETED_MESSAGE_ID, NULL, NULL, temp_pool)) == NULL) {
ngx_shmtx_unlock(&(shpool)->mutex); ngx_shmtx_unlock(&(shpool)->mutex);
ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to channel deleted message"); ngx_log_error(NGX_LOG_ERR, temp_pool->log, 0, "push stream module: unable to allocate memory to channel deleted message");
return; return;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment