Commit 64b9bc3f authored by Wandenberg Peixoto's avatar Wandenberg Peixoto

Initial commit

parents
build/
.cproject
.project
h1. Nginx Push Stream Module
A pure stream http push technology for your Nginx setup.
Comet made easy and *really scalable*.
h2. Installing
<pre>
<code>
# clone
git clone http://github.com/wandenberg/nginx-push-stream-module.git
cd nginx-push-stream-module
# build 0.7.67
./build.sh master 0.7.67
cd build/nginx-0.7.67
# or build 0.8.53
./build.sh master 0.8.53
cd build/nginx-0.8.53
# finish
sudo make install
# checking
sudo /usr/local/nginx/sbin/nginx -v
nginx version: nginx/0.8.53
sudo /usr/local/nginx/sbin/nginx -c nginx-push-stream-module/misc/nginx.conf -t
the configuration file nginx-push-stream-module/misc/nginx.conf syntax is ok
configuration file nginx-push-stream-module/misc/nginx.conf test is successful
# running
sudo /usr/local/nginx/sbin/nginx -c nginx-push-stream-module/misc/nginx.conf
</code>
</pre>
h2. Basic Configuration
<pre>
<code>
location /pub {
# activate publisher mode for this location
push_stream_publisher;
# query string based channel id
set $push_stream_channel_id $arg_id;
# message template
push_stream_message_template "<script>p(~id~,'~channel~','~text~');</script>";
# max messages to store in memory
push_stream_max_message_buffer_length 20;
# message ttl
push_stream_min_message_buffer_timeout 5m;
# client_max_body_size MUST be equal to client_body_buffer_size or
# you will be sorry.
client_max_body_size 32k;
client_body_buffer_size 32k;
}
location ~ /sub/(.*) {
# activate subscriber mode for this location
push_stream_subscriber;
# positional channel path
set $push_stream_channels_path $1;
# header to be sent when receiving new subscriber connection
push_stream_header_template "<html><head><meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-store\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-cache\">\r\n<meta http-equiv=\"Pragma\" content=\"no-cache\">\r\n<meta http-equiv=\"Expires\" content=\"Thu, 1 Jan 1970 00:00:00 GMT\">\r\n<script type=\"text/javascript\">\r\nwindow.onError = null;\r\ndocument.domain = 'localhost';\r\nparent.PushStream.register(this);\r\n</script>\r\n</head>\r\n<body onload=\"try { parent.PushStream.reset(this) } catch (e) {}\">";
# message template
push_stream_message_template "<script>p(~id~,'~channel~','~text~');</script>";
# content-type
push_stream_content_type "text/html; charset=utf-8";
# subscriber may create channels on demand or only authorized
# (publisher) may do it?
push_stream_authorized_channels_only off;
# ping frequency
push_stream_ping_message_interval 10s;
# disconnection candidates test frequency
push_stream_subscriber_disconnect_interval 30s;
# connection ttl to enable recycle
push_stream_subscriber_connection_timeout 15m;
# solving some leakage problem with persistent connections in
# Nginx's chunked filter (ngx_http_chunked_filter_module.c)
chunked_transfer_encoding off;
}
</code>
</pre>
h2. Basic Usage
You can feel the flavor right now at the command line. Try using more than
one terminal and start playing http pubsub:
<pre>
<code>
# Pub
curl -s -v -X POST "http://localhost/pub?id=my_channel_1" -d "Hello World!"
# Sub
curl -s -v "http://localhost/sub/my_channel_1.b20"
# Channel Stats (text format)
curl -s -v "http://localhost/pub?id=my_channel_1"
# All Channels Stats (json format)
curl -s -v "http://localhost/pub?id=ALL"
</code>
</pre>
#!/bin/bash
TAG="$1"
NGINX_VERSION="$2"
PREFIX="nginx-push-stream-module"
CONFIGURE_OPTIONS="--without-select_module \
--without-poll_module \
--without-http_charset_module \
--without-http_ssi_module \
--without-http_auth_basic_module \
--without-http_autoindex_module \
--without-http_geo_module \
--without-http_map_module \
--without-http_referer_module \
--without-http_fastcgi_module \
--without-http_memcached_module \
--without-http_limit_zone_module \
--without-http_limit_req_module \
--without-http_empty_gif_module \
--without-http_browser_module \
--without-http_upstream_ip_hash_module \
--without-mail_pop3_module \
--without-mail_imap_module \
--without-mail_smtp_module \
--with-http_stub_status_module \
--add-module=nginx-push-stream-module"
if [[ -z "$TAG" || -z "$NGINX_VERSION" ]]
then
echo "Usage: $0 <tag> <nginx_version>"
exit 1
fi
(./pack.sh $TAG && \
cd build && \
rm -rf nginx-${NGINX_VERSION}* && \
wget http://sysoev.ru/nginx/nginx-${NGINX_VERSION}.tar.gz && \
tar xzvf nginx-${NGINX_VERSION}.tar.gz && \
cd nginx-$NGINX_VERSION && \
tar xzvf ../$PREFIX-$TAG.tar.gz && \
./configure $CONFIGURE_OPTIONS && \
make && \
echo "
##############################################################
Build generated: build/nginx-$NGINX_VERSION
Configure options used:
$CONFIGURE_OPTIONS
To finish the process:
cd build/nginx-$NGINX_VERSION
sudo make install") || \
(echo "There was a problem building the module" ; exit 1)
echo "##############################################################"
ngx_feature="http_push_stream_module"
ngx_feature_name=
ngx_feature_run=no
ngx_feature_incs=
ngx_feature_path=
ngx_feature_libs=
ngx_feature_test=
ngx_addon_name=ngx_http_push_stream_module
HTTP_MODULES="$HTTP_MODULES ngx_http_push_stream_module"
CORE_INCS="$CORE_INCS \
$ngx_addon_dir/src"
NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
${ngx_addon_dir}/src/ngx_http_push_stream_module.c"
have=NGX_HTTP_HEADERS . auto/have
. auto/feature
types {
text/html html htm shtml;
text/css css;
text/xml xml;
image/gif gif;
image/jpeg jpeg jpg;
application/x-javascript js;
application/atom+xml atom;
application/rss+xml rss;
text/mathml mml;
text/plain txt;
text/vnd.sun.j2me.app-descriptor jad;
text/vnd.wap.wml wml;
text/x-component htc;
image/png png;
image/tiff tif tiff;
image/vnd.wap.wbmp wbmp;
image/x-icon ico;
image/x-jng jng;
image/x-ms-bmp bmp;
image/svg+xml svg;
application/java-archive jar war ear;
application/mac-binhex40 hqx;
application/msword doc;
application/pdf pdf;
application/postscript ps eps ai;
application/rtf rtf;
application/vnd.ms-excel xls;
application/vnd.ms-powerpoint ppt;
application/vnd.wap.wmlc wmlc;
application/vnd.wap.xhtml+xml xhtml;
application/vnd.google-earth.kml+xml kml;
application/vnd.google-earth.kmz kmz;
application/x-cocoa cco;
application/x-java-archive-diff jardiff;
application/x-java-jnlp-file jnlp;
application/x-makeself run;
application/x-perl pl pm;
application/x-pilot prc pdb;
application/x-rar-compressed rar;
application/x-redhat-package-manager rpm;
application/x-sea sea;
application/x-shockwave-flash swf;
application/x-stuffit sit;
application/x-tcl tcl tk;
application/x-x509-ca-cert der pem crt;
application/x-xpinstall xpi;
application/zip zip;
application/octet-stream bin exe dll;
application/octet-stream deb;
application/octet-stream dmg;
application/octet-stream eot;
application/octet-stream iso img;
application/octet-stream msi msp msm;
audio/midi mid midi kar;
audio/mpeg mp3;
audio/x-realaudio ra;
video/3gpp 3gpp 3gp;
video/mpeg mpeg mpg;
video/quicktime mov;
video/x-flv flv;
video/x-mng mng;
video/x-ms-asf asx asf;
video/x-ms-wmv wmv;
video/x-msvideo avi;
}
pid logs/nginx.pid;
error_log logs/nginx-main_error.log;
worker_processes 2;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
access_log logs/nginx-http_access.log;
error_log logs/nginx-http_error.log;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 10;
send_timeout 10;
client_body_timeout 10;
client_header_timeout 10;
sendfile on;
client_header_buffer_size 1k;
large_client_header_buffers 2 4k;
client_max_body_size 1k;
client_body_buffer_size 1k;
ignore_invalid_headers on;
client_body_in_single_buffer on;
server {
listen 80;
server_name localhost;
location /pub {
# activate publisher mode for this location
push_stream_publisher;
# query string based channel id
set $push_stream_channel_id $arg_id;
# message template
push_stream_message_template "<script>p(~id~,'~channel~','~text~');</script>";
# max messages to store in memory
push_stream_max_message_buffer_length 20;
# message ttl
push_stream_min_message_buffer_timeout 5m;
# client_max_body_size MUST be equal to client_body_buffer_size or
# you will be sorry.
client_max_body_size 32k;
client_body_buffer_size 32k;
}
location ~ /sub/(.*) {
# activate subscriber mode for this location
push_stream_subscriber;
# positional channel path
set $push_stream_channels_path $1;
# header to be sent when receiving new subscriber connection
push_stream_header_template "<html><head><meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-store\">\r\n<meta http-equiv=\"Cache-Control\" content=\"no-cache\">\r\n<meta http-equiv=\"Pragma\" content=\"no-cache\">\r\n<meta http-equiv=\"Expires\" content=\"Thu, 1 Jan 1970 00:00:00 GMT\">\r\n<script type=\"text/javascript\">\r\nwindow.onError = null;\r\ndocument.domain = 'localhost';\r\nparent.PushStream.register(this);\r\n</script>\r\n</head>\r\n<body onload=\"try { parent.PushStream.reset(this) } catch (e) {}\">";
# message template
push_stream_message_template "<script>p(~id~,'~channel~','~text~');</script>";
# content-type
push_stream_content_type "text/html; charset=utf-8";
# subscriber may create channels on demand or only authorized
# (publisher) may do it?
push_stream_authorized_channels_only off;
# ping frequency
push_stream_ping_message_interval 10s;
# disconnection candidates test frequency
push_stream_subscriber_disconnect_interval 30s;
# connection ttl to enable recycle
push_stream_subscriber_connection_timeout 15m;
# solving some leakage problem with persitent connections in
# Nginx's chunked filter (ngx_http_chunked_filter_module.c)
chunked_transfer_encoding off;
}
}
}
#!/bin/bash
TAG="$1"
PREFIX="nginx-push-stream-module"
if [[ -z "$TAG" ]]
then
echo "Usage: $0 <tag>"
exit 1
fi
mkdir -p build
git archive --format=tar --prefix=$PREFIX/ $TAG src config | gzip > build/$PREFIX-$TAG.tar.gz
echo "Package generated: build/$PREFIX-$TAG.tar.gz"
#include <ngx_http_push_stream_module.h>
#include <ngx_http_push_stream_rbtree_util.c>
#include <ngx_http_push_stream_module_utils.c>
#include <ngx_http_push_stream_module_ipc.c>
#include <ngx_http_push_stream_module_setup.c>
#include <ngx_http_push_stream_module_publisher.c>
#include <ngx_http_push_stream_module_subscriber.c>
static void
ngx_http_push_stream_send_response_channel_id_not_provided(ngx_http_request_t *r)
{
ngx_buf_t *buf = ngx_create_temp_buf(r->pool, 0);
ngx_chain_t *chain;
if (buf != NULL) {
buf->pos = (u_char *) NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE;
buf->last = buf->pos + sizeof(NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE) - 1;
buf->start = buf->pos;
buf->end = buf->last;
chain = ngx_http_push_stream_create_output_chain(buf, r->pool, r->connection->log);
chain->buf->last_buf = 1;
r->headers_out.content_length_n = ngx_buf_size(buf);
r->headers_out.status = NGX_HTTP_NOT_FOUND;
r->headers_out.content_type = NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_TEXT_PLAIN;
ngx_http_send_header(r);
ngx_http_output_filter(r, chain);
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "push stream module: the $push_stream_channel_id variable is required but is not set");
}
}
static ngx_str_t *
ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *cf)
{
ngx_http_variable_value_t *vv = ngx_http_get_indexed_variable(r, cf->index_channel_id);
size_t len;
ngx_str_t *id;
if (vv == NULL || vv->not_found || vv->len == 0) {
ngx_http_push_stream_send_response_channel_id_not_provided(r);
return NULL;
}
// maximum length limiter for channel id
len = vv->len <= cf->max_channel_id_length ? vv->len : cf->max_channel_id_length;
if ((id = ngx_pcalloc(r->pool, sizeof(*id) + len + 1)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for $push_stream_channel_id string");
return NULL;
}
id->len = len;
id->data = (u_char *) (id + 1);
ngx_memcpy(id->data, vv->data, len);
return id;
}
static void
ngx_http_push_stream_match_channel_info_subtype(size_t off, u_char *cur, size_t rem, u_char **priority, const ngx_str_t **format, ngx_str_t *content_type)
{
static ngx_http_push_stream_content_subtype_t subtypes[] = {
{ "json" , 4, &NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON },
{ "yaml" , 4, &NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML },
{ "xml" , 3, &NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML },
{ "x-json", 6, &NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON },
{ "x-yaml", 6, &NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML }
};
u_char *start = cur + off;
ngx_uint_t i;
for(i=0; i<(sizeof(subtypes) / sizeof(ngx_http_push_stream_content_subtype_t)); i++) {
if (ngx_strncmp(start, subtypes[i].subtype, rem<subtypes[i].len ? rem : subtypes[i].len) == 0) {
if (*priority > start) {
*format = subtypes[i].format;
*priority = start;
content_type->data = cur;
content_type->len = off + 1 + subtypes[i].len;
}
}
}
}
static ngx_buf_t *
ngx_http_push_stream_channel_info_formatted(ngx_pool_t *pool, ngx_str_t channelId, ngx_uint_t published_messages, ngx_uint_t stored_messages, ngx_uint_t subscribers, const ngx_str_t *format)
{
ngx_buf_t *b;
ngx_uint_t len;
len = channelId.len + 3*NGX_INT_T_LEN + format->len - 8; // minus 8 sprintf
if ((b = ngx_create_temp_buf(pool, len)) == NULL) {
return NULL;
}
ngx_memset(b->start, '\0', len);
b->last = ngx_sprintf(b->start, (char *) format->data, channelId.data, published_messages, stored_messages, subscribers);
return b;
}
// print information about a channel
static ngx_int_t
ngx_http_push_stream_channel_info(ngx_http_request_t *r, ngx_str_t channelId, ngx_uint_t published_messages, ngx_uint_t stored_messages, ngx_uint_t subscribers)
{
ngx_buf_t *b;
ngx_str_t content_type = NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_TEXT_PLAIN;
const ngx_str_t *format = &NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN;
ngx_int_t rc;
ngx_chain_t *chain;
if (r->headers_in.accept) {
// lame content-negotiation (without regard for qvalues)
u_char *accept = r->headers_in.accept->value.data;
size_t len = r->headers_in.accept->value.len;
size_t rem;
u_char *cur = accept;
u_char *priority = &accept[len - 1];
for(rem=len; (cur = ngx_strnstr(cur, "text/", rem)) != NULL; cur += sizeof("text/") - 1) {
rem = len - ((size_t) (cur-accept) + sizeof("text/") - 1);
if (ngx_strncmp(cur + sizeof("text/") - 1, "plain", rem < 5 ? rem : 5) == 0) {
if (priority) {
format = &NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN;
priority = cur + sizeof("text/") - 1;
// content-type is already set by default
}
}
ngx_http_push_stream_match_channel_info_subtype(sizeof("text/") - 1, cur, rem, &priority, &format, &content_type);
}
cur = accept;
for(rem=len; (cur = ngx_strnstr(cur, "application/", rem)) != NULL; cur += sizeof("application/") - 1) {
rem = len - ((size_t) (cur-accept) + sizeof("application/") - 1);
ngx_http_push_stream_match_channel_info_subtype(sizeof("application/") - 1, cur, rem, &priority, &format, &content_type);
}
}
r->headers_out.content_type.len = content_type.len;
r->headers_out.content_type.data = content_type.data;
if ((b = ngx_http_push_stream_channel_info_formatted(r->pool, channelId, published_messages, stored_messages, subscribers, format)) == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// lastly, set the content-length, because if the status code isn't 200, nginx may not do so automatically
r->headers_out.content_length_n = ngx_buf_size(b);
if (ngx_http_send_header(r) > NGX_HTTP_SPECIAL_RESPONSE) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
chain = ngx_http_push_stream_create_output_chain(b, r->pool, r->connection->log);
rc = ngx_http_output_filter(r, chain);
return rc;
}
static void
ngx_http_push_stream_rbtree_walker_channel_info_locked(ngx_rbtree_t *tree, ngx_pool_t *pool, ngx_rbtree_node_t *node, ngx_queue_t *queue_channel_info, const ngx_str_t *format)
{
ngx_rbtree_node_t *sentinel = tree->sentinel;
if (node != sentinel) {
ngx_http_push_stream_channel_t *channel = (ngx_http_push_stream_channel_t *) node;
ngx_http_push_stream_msg_t *msg;
if ((msg = ngx_pcalloc(pool, sizeof(ngx_http_push_stream_msg_t))) == NULL) {
return;
}
if ((msg->buf = ngx_http_push_stream_channel_info_formatted(pool, channel->id, channel->last_message_id, channel->stored_messages, channel->subscribers, format)) == NULL) {
return;
}
ngx_queue_insert_tail(queue_channel_info, &msg->queue);
if (node->left != NULL) {
ngx_http_push_stream_rbtree_walker_channel_info_locked(tree, pool, node->left, queue_channel_info, format);
}
if (node->right != NULL) {
ngx_http_push_stream_rbtree_walker_channel_info_locked(tree, pool, node->right, queue_channel_info, format);
}
}
}
// print information about all channels
static ngx_int_t
ngx_http_push_stream_all_channels_info(ngx_http_request_t *r)
{
ngx_buf_t *b;
ngx_uint_t len = 0;
ngx_str_t content_type = NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_JSON;
const ngx_str_t *format = &NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON;
const ngx_str_t *head = &NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_JSON;
const ngx_str_t *tail = &NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_JSON;
ngx_chain_t *chain;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_rbtree_t *tree = &((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->tree;
ngx_queue_t queue_channel_info;
ngx_queue_t *cur;
ngx_tm_t tm;
u_char currenttime[20];
u_char hostname[ngx_cycle->hostname.len + 1];
r->headers_out.content_type.len = content_type.len;
r->headers_out.content_type.data = content_type.data;
ngx_queue_init(&queue_channel_info);
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_rbtree_walker_channel_info_locked(tree, r->pool, tree->root, &queue_channel_info, format);
ngx_shmtx_unlock(&shpool->mutex);
if ((chain = ngx_pcalloc(r->pool, sizeof(*chain))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_gmtime(ngx_time(), &tm);
ngx_sprintf(currenttime, (char *) NGX_PUSH_STREAM_DATE_FORMAT_ISO_8601.data, tm.ngx_tm_year, tm.ngx_tm_mon, tm.ngx_tm_mday, tm.ngx_tm_hour, tm.ngx_tm_min, tm.ngx_tm_sec);
currenttime[19] = '\0';
ngx_memcpy(hostname, (char *) ngx_cycle->hostname.data, ngx_cycle->hostname.len);
hostname[ngx_cycle->hostname.len] = '\0';
if ((b = ngx_create_temp_buf(r->pool, head->len + sizeof(hostname) + sizeof(currenttime))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for response channels info head/tail");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
b->last = ngx_sprintf(b->last, (char *) head->data, hostname, currenttime);
// calculates the size required to send the information from each channel
cur = ngx_queue_head(&queue_channel_info);
while (cur != &queue_channel_info) {
ngx_http_push_stream_msg_t *msg = (ngx_http_push_stream_msg_t *) cur;
len += ngx_buf_size(msg->buf) + NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_ITEM_SEP_JSON.len;
cur = ngx_queue_next(cur);
}
// sum the size of tail and formatted head messages
len += tail->len + ngx_buf_size(b);
// lastly, set the content-length, because if the status code isn't 200, nginx may not do so automatically
r->headers_out.content_length_n = len;
if (ngx_http_send_header(r) > NGX_HTTP_SPECIAL_RESPONSE) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// send the head message
b->last_buf = 1;
b->memory = 1;
chain->buf = b;
ngx_http_output_filter(r, chain);
cur = ngx_queue_head(&queue_channel_info);
while (cur != &queue_channel_info) {
ngx_http_push_stream_msg_t *msg = (ngx_http_push_stream_msg_t *) cur;
chain->buf = msg->buf;
ngx_http_output_filter(r, chain);
cur = ngx_queue_next(cur);
// sends the separate information
chain->buf = b;
if (cur != &queue_channel_info) {
chain->buf->pos = NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_ITEM_SEP_JSON.data;
chain->buf->last = NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_ITEM_SEP_JSON.data + NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_ITEM_SEP_JSON.len;
} else {
chain->buf->pos = NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_ITEM_SEP_LAST_ITEM_JSON.data;
chain->buf->last = NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_ITEM_SEP_LAST_ITEM_JSON.data + NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_ITEM_SEP_LAST_ITEM_JSON.len;
}
chain->buf->start = chain->buf->pos;
chain->buf->end = chain->buf->last;
ngx_http_output_filter(r, chain);
}
// send the tail message
chain->buf = b;
chain->buf->pos = tail->data;
chain->buf->last = tail->data + tail->len;
chain->buf->start = chain->buf->pos;
chain->buf->end = chain->buf->last;
ngx_http_output_filter(r, chain);
return NGX_HTTP_OK;
}
static void
ngx_http_push_stream_reserve_message_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg)
{
if (!msg->persistent) {
msg->refcount++;
}
// we need a refcount because channel messages MAY be dequed before they are used up. It thus falls on the IPC stuff to free it.
}
static void
ngx_http_push_stream_release_message_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg)
{
if (!msg->persistent) {
msg->refcount--;
if (msg->queue.next == NULL && msg->refcount <= 0) {
// message had been dequeued and nobody needs it anymore
ngx_http_push_stream_free_message_locked(msg, ngx_http_push_stream_shpool);
}
if (channel->stored_messages > msg->delete_oldest_received_min_messages && ngx_http_push_stream_get_oldest_message_locked(channel) == msg) {
ngx_http_push_stream_delete_message_locked(channel, msg, ngx_http_push_stream_shpool);
}
}
}
static ngx_int_t
ngx_http_push_stream_respond_status_only(ngx_http_request_t *r, ngx_int_t status_code, const ngx_str_t *statusline)
{
r->headers_out.status = status_code;
if (statusline != NULL) {
r->headers_out.status_line.len = statusline->len;
r->headers_out.status_line.data = statusline->data;
}
r->headers_out.content_length_n = 0;
r->header_only = 1;
return ngx_http_send_header(r);
}
#ifndef NGX_HTTP_PUSH_STREAM_MODULE_H_
#define NGX_HTTP_PUSH_STREAM_MODULE_H_
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <ngx_channel.h>
#include <nginx.h>
typedef struct {
size_t shm_size;
} ngx_http_push_stream_main_conf_t;
typedef struct {
ngx_int_t index_channel_id;
ngx_int_t index_channels_path;
time_t buffer_timeout;
ngx_int_t min_messages;
ngx_int_t max_messages;
ngx_int_t authorize_channel;
ngx_int_t store_messages;
ngx_int_t delete_oldest_received_message;
ngx_int_t max_channel_id_length;
ngx_str_t header_template;
ngx_str_t message_template;
ngx_str_t content_type;
ngx_msec_t ping_message_interval;
ngx_msec_t subscriber_disconnect_interval;
time_t subscriber_connection_timeout;
ngx_str_t broadcast_channel_prefix;
ngx_int_t broadcast_channel_max_qtd;
} ngx_http_push_stream_loc_conf_t;
// variables
static ngx_str_t ngx_http_push_stream_channel_id = ngx_string("push_stream_channel_id");
static ngx_str_t ngx_http_push_stream_channels_path = ngx_string("push_stream_channels_path");
// shared memory segment name
static ngx_str_t ngx_push_stream_shm_name = ngx_string("push_stream_module");
// message queue
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_buf_t *buf;
time_t expires;
ngx_uint_t delete_oldest_received_min_messages; // NGX_MAX_UINT32_VALUE for 'never'
ngx_int_t refcount;
ngx_flag_t persistent;
} ngx_http_push_stream_msg_t;
typedef struct ngx_http_push_stream_subscriber_cleanup_s ngx_http_push_stream_subscriber_cleanup_t;
// subscriber request queue
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_http_request_t *request;
ngx_http_push_stream_subscriber_cleanup_t *clndata;
} ngx_http_push_stream_subscriber_t;
typedef struct {
ngx_queue_t queue;
pid_t pid;
ngx_int_t slot;
ngx_http_push_stream_subscriber_t *subscriber_sentinel;
} ngx_http_push_stream_pid_queue_t;
// our typecast-friendly rbtree node (channel)
typedef struct {
ngx_rbtree_node_t node; // this MUST be first
ngx_str_t id;
ngx_http_push_stream_msg_t *message_queue;
ngx_uint_t last_message_id;
ngx_uint_t stored_messages;
ngx_http_push_stream_pid_queue_t workers_with_subscribers;
ngx_uint_t subscribers;
} ngx_http_push_stream_channel_t;
typedef struct {
ngx_queue_t queue;
ngx_http_push_stream_subscriber_t *subscriber;
ngx_http_push_stream_channel_t *channel;
} ngx_http_push_stream_subscription_t;
typedef struct {
ngx_queue_t queue; // this MUST be first
ngx_http_request_t *request;
ngx_http_push_stream_subscription_t *subscriptions_sentinel;
ngx_http_push_stream_subscriber_cleanup_t *clndata;
ngx_pid_t worker_subscribed_pid;
time_t expires;
} ngx_http_push_stream_worker_subscriber_t;
// cleaning supplies
struct ngx_http_push_stream_subscriber_cleanup_s {
ngx_http_push_stream_worker_subscriber_t *worker_subscriber;
};
typedef struct {
ngx_queue_t queue;
ngx_http_push_stream_msg_t *msg;
} ngx_http_push_stream_msg_queue_t;
// garbage collecting goodness
typedef struct {
ngx_queue_t queue;
ngx_http_push_stream_channel_t *channel;
} ngx_http_push_stream_channel_queue_t;
// messages to worker processes
typedef struct {
ngx_queue_t queue;
ngx_http_push_stream_msg_t *msg; // ->shared memory
ngx_int_t status_code;
ngx_pid_t pid;
ngx_http_push_stream_channel_t *channel; // ->shared memory
ngx_http_push_stream_subscriber_t *subscriber_sentinel; // ->a worker's local pool
} ngx_http_push_stream_worker_msg_t;
typedef struct {
ngx_queue_t messages_queue;
ngx_http_push_stream_worker_subscriber_t *worker_subscribers_sentinel;
} ngx_http_push_stream_worker_data_t;
// shared memory
typedef struct {
ngx_rbtree_t tree;
ngx_uint_t channels; // # of channels being used
ngx_http_push_stream_worker_data_t *ipc; // interprocess stuff
} ngx_http_push_stream_shm_data_t;
typedef struct {
char *subtype;
size_t len;
const ngx_str_t *format;
} ngx_http_push_stream_content_subtype_t;
ngx_event_t ngx_http_push_stream_ping_event;
ngx_event_t ngx_http_push_stream_disconnect_event;
ngx_int_t ngx_http_push_stream_worker_processes;
ngx_pool_t *ngx_http_push_stream_pool;
ngx_slab_pool_t *ngx_http_push_stream_shpool;
ngx_shm_zone_t *ngx_http_push_stream_shm_zone = NULL;
ngx_chain_t *ngx_http_push_stream_header_chain = NULL;
ngx_chain_t *ngx_http_push_stream_crlf_chain = NULL;
ngx_http_push_stream_msg_t *ngx_http_push_stream_ping_msg = NULL;
ngx_buf_t *ngx_http_push_stream_ping_buf = NULL;
// emergency garbage collecting goodness;
ngx_http_push_stream_channel_queue_t channel_gc_sentinel;
// garbage-collecting shared memory slab allocation
void * ngx_http_push_stream_slab_alloc_locked(size_t size);
static ngx_int_t ngx_http_push_stream_channel_collector(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool);
// setup
static ngx_int_t ngx_http_push_stream_init_module(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_init_worker(ngx_cycle_t *cycle);
static void ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle);
static void ngx_http_push_stream_exit_master(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_postconfig(ngx_conf_t *cf);
static void * ngx_http_push_stream_create_main_conf(ngx_conf_t *cf);
static void * ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf);
static char * ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);
// subscriber
static char * ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static ngx_int_t ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_push_stream_broadcast_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_int_t status_code, const ngx_str_t *status_line, ngx_log_t *log, ngx_slab_pool_t *shpool);
#define ngx_http_push_stream_broadcast_status_locked(channel, status_code, status_line, log, shpool) ngx_http_push_stream_broadcast_locked(channel, NULL, status_code, status_line, log, shpool)
#define ngx_http_push_stream_broadcast_message_locked(channel, msg, log, shpool) ngx_http_push_stream_broadcast_locked(channel, msg, 0, NULL, log, shpool)
static ngx_int_t ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *sentinel, ngx_http_push_stream_msg_t *msg, ngx_int_t status_code, const ngx_str_t *status_line);
static void ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_t *data);
static void ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber);
// publisher
static char * ngx_http_push_stream_publisher(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static ngx_int_t ngx_http_push_stream_publisher_handler(ngx_http_request_t *r);
static void ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r);
// channel
static void ngx_http_push_stream_send_response_channel_id_not_provided(ngx_http_request_t *r);
static ngx_str_t * ngx_http_push_stream_get_channel_id(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *cf);
static ngx_int_t ngx_http_push_stream_channel_info(ngx_http_request_t *r, ngx_str_t channelId, ngx_uint_t published_message_queue_size, ngx_uint_t stored_message_queue_size, ngx_uint_t subscriber_queue_size);
static ngx_int_t ngx_http_push_stream_all_channels_info(ngx_http_request_t *r);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_delete_channel_locked(ngx_http_push_stream_channel_t *trash);
static ngx_http_push_stream_channel_t * ngx_http_push_stream_clean_channel_locked(ngx_http_push_stream_channel_t *channel);
// channel messages
static void ngx_http_push_stream_reserve_message_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg);
static void ngx_http_push_stream_release_message_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg);
static ngx_inline void ngx_http_push_stream_general_delete_message_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_int_t force, ngx_slab_pool_t *shpool);
#define ngx_http_push_stream_delete_message_locked(channel, msg, shpool) ngx_http_push_stream_general_delete_message_locked(channel, msg, 0, shpool)
#define ngx_http_push_stream_force_delete_message_locked(channel, msg, shpool) ngx_http_push_stream_general_delete_message_locked(channel, msg, 1, shpool)
static ngx_inline void ngx_http_push_stream_free_message_locked(ngx_http_push_stream_msg_t *msg, ngx_slab_pool_t *shpool);
// utilities
// general request handling
static void ngx_http_push_stream_copy_preallocated_buffer(ngx_buf_t *buf, ngx_buf_t *cbuf);
static ngx_table_elt_t * ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value);
static ngx_int_t ngx_http_push_stream_respond_status_only(ngx_http_request_t *r, ngx_int_t status_code, const ngx_str_t *statusline);
static ngx_chain_t * ngx_http_push_stream_create_output_chain_general(ngx_buf_t *buf, ngx_pool_t *pool, ngx_log_t *log, ngx_slab_pool_t *shpool);
#define ngx_http_push_stream_create_output_chain(buf, pool, log) ngx_http_push_stream_create_output_chain_general(buf, pool, log, NULL)
#define ngx_http_push_stream_create_output_chain_locked(buf, pool, log, shpool) ngx_http_push_stream_create_output_chain_general(buf, pool, log, shpool)
static ngx_int_t ngx_http_push_stream_send_body_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf);
static ngx_int_t ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_ping_timer_set(ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_ping_timer_reset(ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev);
static void ngx_http_push_stream_disconnect_timer_set(ngx_http_push_stream_loc_conf_t *pslcf);
static void ngx_http_push_stream_disconnect_timer_reset(ngx_http_push_stream_loc_conf_t *pslcf);
static u_char * ngx_http_push_stream_str_replace_locked(u_char *org, u_char *find, u_char *replace, ngx_pool_t *temp_pool);
static ngx_buf_t * ngx_http_push_stream_get_formatted_message_locked(ngx_http_push_stream_loc_conf_t *pslcf, ngx_http_push_stream_channel_t *channel, ngx_buf_t *buf, ngx_pool_t *temp_pool);
// shared memory
static ngx_int_t ngx_http_push_stream_set_up_shm(ngx_conf_t *cf, size_t shm_size);
static ngx_int_t ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data);
static ngx_int_t ngx_http_push_stream_movezig_channel_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool);
// ipc
static ngx_int_t ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers);
static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_init_ipc_shm(ngx_int_t workers);
static void ngx_http_push_stream_channel_handler(ngx_event_t *ev);
static ngx_inline void ngx_http_push_stream_process_worker_message(void);
static ngx_inline void ngx_http_push_stream_send_worker_ping_message(void);
static ngx_inline void ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect);
static ngx_inline void ngx_http_push_stream_census_worker_subscribers(void);
static void ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_int_t status_code, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_alert_worker_send_ping(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_alert_worker_disconnect_subscribers(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log);
static ngx_int_t ngx_http_push_stream_alert_worker_census_subscribers(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log);
// constants
#define NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES 49
#define NGX_CMD_HTTP_PUSH_STREAM_SEND_PING 50
#define NGX_CMD_HTTP_PUSH_STREAM_DISCONNECT_SUBSCRIBERS 51
#define NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS 52
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID = ngx_string("ALL");
#define NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE "No channel id provided."
#define NGX_HTTP_PUSH_STREAM_MAX_CHANNEL_ID_LENGTH 1024 // bytes
#define NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE 33554432 // 32 megs
#define NGX_HTTP_PUSH_STREAM_DEFAULT_BUFFER_TIMEOUT 7200
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MIN_MESSAGES 1
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MAX_MESSAGES 10
#define NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE ""
#define NGX_HTTP_PUSH_STREAM_DEFAULT_CONTENT_TYPE "text/plain"
#define NGX_HTTP_PUSH_STREAM_DEFAULT_BROADCAST_CHANNEL_PREFIX ""
static const ngx_str_t NGX_HTTP_PUSH_STREAM_BACKTRACK_SEP = ngx_string(".b");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_SLASH = ngx_string("/");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CRLF = ngx_string("\r\n");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_TEXT_PLAIN = ngx_string("text/plain");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CONTENT_TYPE_JSON = ngx_string("application/json");
static const ngx_str_t NGX_PUSH_STREAM_TOKEN_MESSAGE_ID = ngx_string("~id~");
static const ngx_str_t NGX_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL = ngx_string("~channel~");
static const ngx_str_t NGX_PUSH_STREAM_TOKEN_MESSAGE_TEXT = ngx_string("~text~");
static const ngx_str_t NGX_PUSH_STREAM_PING_MESSAGE_ID = ngx_string("-1");
static const ngx_str_t NGX_PUSH_STREAM_PING_MESSAGE_TEXT = ngx_string("");
static const ngx_str_t NGX_PUSH_STREAM_PING_CHANNEL_ID = ngx_string("");
static const ngx_str_t NGX_PUSH_STREAM_DATE_FORMAT_ISO_8601 = ngx_string("%4d-%02d-%02dT%02d:%02d:%02d");
// message codes
#define NGX_HTTP_PUSH_STREAM_MESSAGE_RECEIVED 9000
#define NGX_HTTP_PUSH_STREAM_MESSAGE_QUEUED 9001
#define NGX_HTTP_PUSH_STREAM_MESSAGE_FOUND 1000
#define NGX_HTTP_PUSH_STREAM_MESSAGE_EXPECTED 1001
#define NGX_HTTP_PUSH_STREAM_MESSAGE_EXPIRED 1002
#ifndef NGX_HTTP_CONFLICT
#define NGX_HTTP_CONFLICT 409
#endif
#ifndef NGX_HTTP_GONE
#define NGX_HTTP_GONE 410
#endif
#ifndef NGX_HTTP_CREATED
#define NGX_HTTP_CREATED 201
#endif
#ifndef NGX_HTTP_ACCEPTED
#define NGX_HTTP_ACCEPTED 202
#endif
// headers
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ETAG = ngx_string("Etag");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_VARY = ngx_string("Vary");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HEADER_ALLOW = ngx_string("Allow");
// header values
//const ngx_str_t NGX_HTTP_PUSH_CACHE_CONTROL_VALUE = ngx_string("no-cache");
// status strings
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HTTP_STATUS_409 = ngx_string("409 Conflict");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_HTTP_STATUS_410 = ngx_string("410 Gone");
// other stuff
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_DELETE = ngx_string("GET, POST, PUT, DELETE");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_ALLOW_GET = ngx_string("GET");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_VARY_HEADER_VALUE = ngx_string("If-None-Match, If-Modified-Since");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_PLAIN = ngx_string(
"channel: %s" CRLF
"published_messages: %ui" CRLF
"stored_messages: %ui" CRLF
"active_subscribers: %ui" CRLF
"\0");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_HEAD_JSON = ngx_string("{\"hostname\": \"%s\", \"time\": \"%s\", \"infos\": [" CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_GROUP_TAIL_JSON = ngx_string("]}" CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_ITEM_SEP_JSON = ngx_string("," CRLF);
// have to be the same size of NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_ITEM_SEP_JSON
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_ITEM_SEP_LAST_ITEM_JSON = ngx_string(" " CRLF);
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_JSON = ngx_string(
"{\"channel\": \"%s\", "
"\"published_messages\": \"%ui\", "
"\"stored_messages\": \"%ui\", "
"\"subscribers\": \"%ui\"}"
"\0");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_XML = ngx_string(
"<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" CRLF
"<channel>" CRLF
" <name>%s</name>" CRLF
" <published_messages>%ui</published_messages>" CRLF
" <stored_messages>%ui</stored_messages>" CRLF
" <subscribers>%ui</subscribers>" CRLF
"</channel>" CRLF
"\0");
static const ngx_str_t NGX_HTTP_PUSH_STREAM_CHANNEL_INFO_YAML = ngx_string(
"---" CRLF
"channel: %s" CRLF
"published_messages: %ui" CRLF
"stored_messages: %ui" CRLF
"subscribers %ui" CRLF
"\0");
#endif /* NGX_HTTP_PUSH_STREAM_MODULE_H_ */
#include <ngx_http_push_stream_module.h>
// worker processes of the world, unite.
ngx_socket_t ngx_http_push_stream_socketpairs[NGX_MAX_PROCESSES][2];
static ngx_int_t
ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers)
{
int i, s = 0, on = 1;
ngx_int_t last_expected_process = ngx_last_process;
/*
* here's the deal: we have no control over fork()ing, nginx's internal
* socketpairs are unusable for our purposes (as of nginx 0.8 -- check the
* code to see why), and the module initialization callbacks occur before
* any workers are spawned. Rather than futzing around with existing
* socketpairs, we populate our own socketpairs array.
* Trouble is, ngx_spawn_process() creates them one-by-one, and we need to
* do it all at once. So we must guess all the workers' ngx_process_slots in
* advance. Meaning the spawning logic must be copied to the T.
*/
for(i=0; i<workers; i++) {
while (s < last_expected_process && ngx_processes[s].pid != -1) {
// find empty existing slot
s++;
}
// copypaste from os/unix/ngx_process.c (ngx_spawn_process)
ngx_socket_t *socks = ngx_http_push_stream_socketpairs[s];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "socketpair() failed on socketpair while initializing push stream module");
return NGX_ERROR;
}
if (ngx_nonblocking(socks[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed on socketpair while initializing push stream module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
if (ngx_nonblocking(socks[1]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, ngx_nonblocking_n " failed on socketpair while initializing push stream module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
if (ioctl(socks[0], FIOASYNC, &on) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "ioctl(FIOASYNC) failed on socketpair while initializing push stream module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
if (fcntl(socks[0], F_SETOWN, ngx_pid) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(F_SETOWN) failed on socketpair while initializing push stream module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed on socketpair while initializing push stream module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
if (fcntl(socks[1], F_SETFD, FD_CLOEXEC) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fcntl(FD_CLOEXEC) failed while initializing push stream module");
ngx_close_channel(socks, cycle->log);
return NGX_ERROR;
}
s++; // NEXT!!
}
return NGX_OK;
}
static void
ngx_http_push_stream_ipc_exit_worker(ngx_cycle_t *cycle)
{
ngx_close_channel((ngx_socket_t *) ngx_http_push_stream_socketpairs[ngx_process_slot], cycle->log);
}
static ngx_int_t
ngx_http_push_stream_reset_channel_subscribers_count_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
channel->subscribers = 0;
return NGX_OK;
}
// will be called many times
static ngx_int_t
ngx_http_push_stream_init_ipc_shm(ngx_int_t workers)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *d = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
ngx_http_push_stream_worker_data_t *workers_data;
int i;
ngx_shmtx_lock(&shpool->mutex);
if (d->ipc != NULL) {
// already initialized... reset channel subscribers counters and census subscribers
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = thisworker_data->worker_subscribers_sentinel;
ngx_queue_init(&sentinel->queue);
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_reset_channel_subscribers_count_locked);
ngx_shmtx_unlock(&shpool->mutex);
for(i=0; i<workers; i++) {
ngx_http_push_stream_alert_worker_census_subscribers(ngx_pid, i, ngx_http_push_stream_pool->log);
}
return NGX_OK;
}
// initialize worker message queues
if ((workers_data = ngx_slab_alloc_locked(shpool, sizeof(*workers_data)*workers)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR;
}
for(i=0; i<workers; i++) {
ngx_queue_init(&workers_data[i].messages_queue);
if ((workers_data[i].worker_subscribers_sentinel = ngx_slab_alloc_locked(shpool, sizeof(*workers_data[i].worker_subscribers_sentinel))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
return NGX_ERROR;
}
ngx_queue_init(&((ngx_http_push_stream_worker_subscriber_t *) workers_data[i].worker_subscribers_sentinel)->queue);
}
d->ipc = workers_data;
ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK;
}
static ngx_int_t
ngx_http_push_stream_register_worker_message_handler(ngx_cycle_t *cycle)
{
if (ngx_add_channel_event(cycle, ngx_http_push_stream_socketpairs[ngx_process_slot][1], NGX_READ_EVENT, ngx_http_push_stream_channel_handler) == NGX_ERROR) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "failed to register channel handler while initializing push stream module worker");
return NGX_ERROR;
}
return NGX_OK;
}
static void
ngx_http_push_stream_channel_handler(ngx_event_t *ev)
{
// copypaste from os/unix/ngx_process_cycle.c (ngx_channel_handler)
ngx_int_t n;
ngx_channel_t ch;
ngx_connection_t *c;
if (ev->timedout) {
ev->timedout = 0;
return;
}
c = ev->data;
while (1) {
n = ngx_read_channel(c->fd, &ch, sizeof(ch), ev->log);
if (n == NGX_ERROR) {
if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
ngx_del_conn(c, 0);
}
ngx_close_connection(c);
return;
}
if ((ngx_event_flags & NGX_USE_EVENTPORT_EVENT) && (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR)) {
return;
}
if (n == NGX_AGAIN) {
return;
}
//ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "push stream module: channel command: %d", ch.command);
if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES) {
ngx_http_push_stream_process_worker_message();
} else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_SEND_PING) {
ngx_http_push_stream_send_worker_ping_message();
} else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_DISCONNECT_SUBSCRIBERS) {
// disconnect only expired subscribers (force_disconnect = 0)
ngx_http_push_stream_disconnect_worker_subscribers(0);
} else if (ch.command == NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS) {
ngx_http_push_stream_census_worker_subscribers();
}
}
}
static ngx_int_t
ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log)
{
// seems ch doesn't need to have fd set. odd, but roll with it. pid and process slot also unnecessary.
static ngx_channel_t ch = {NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES, 0, 0, -1};
return ngx_write_channel(ngx_http_push_stream_socketpairs[slot][0], &ch, sizeof(ngx_channel_t), log);
}
static ngx_int_t
ngx_http_push_stream_alert_worker_send_ping(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log)
{
// seems ch doesn't need to have fd set. odd, but roll with it. pid and process slot also unnecessary.
static ngx_channel_t ch = {NGX_CMD_HTTP_PUSH_STREAM_SEND_PING, 0, 0, -1};
return ngx_write_channel(ngx_http_push_stream_socketpairs[slot][0], &ch, sizeof(ngx_channel_t), log);
}
static ngx_int_t
ngx_http_push_stream_alert_worker_disconnect_subscribers(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log)
{
// seems ch doesn't need to have fd set. odd, but roll with it. pid and process slot also unnecessary.
static ngx_channel_t ch = {NGX_CMD_HTTP_PUSH_STREAM_DISCONNECT_SUBSCRIBERS, 0, 0, -1};
return ngx_write_channel(ngx_http_push_stream_socketpairs[slot][0], &ch, sizeof(ngx_channel_t), log);
}
static ngx_int_t
ngx_http_push_stream_alert_worker_census_subscribers(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log)
{
// seems ch doesn't need to have fd set. odd, but roll with it. pid and process slot also unnecessary.
static ngx_channel_t ch = {NGX_CMD_HTTP_PUSH_STREAM_CENSUS_SUBSCRIBERS, 0, 0, -1};
return ngx_write_channel(ngx_http_push_stream_socketpairs[slot][0], &ch, sizeof(ngx_channel_t), log);
}
static ngx_inline void
ngx_http_push_stream_census_worker_subscribers(void)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = thisworker_data->worker_subscribers_sentinel;
ngx_http_push_stream_worker_subscriber_t *cur, *next;
ngx_shmtx_lock(&shpool->mutex);
cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue);
while (cur != sentinel) {
next = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue);
ngx_http_push_stream_subscription_t *cur_subscription, *sentinel_subscription;
sentinel_subscription = cur->subscriptions_sentinel;
cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_head(&sentinel_subscription->queue);
while (cur_subscription != sentinel_subscription) {
cur_subscription->channel->subscribers++;
cur_subscription = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur_subscription->queue);
}
cur = next;
}
ngx_shmtx_unlock(&shpool->mutex);
}
static ngx_inline void
ngx_http_push_stream_disconnect_worker_subscribers(ngx_flag_t force_disconnect)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = thisworker_data->worker_subscribers_sentinel;
ngx_http_push_stream_worker_subscriber_t *cur, *next;
ngx_shmtx_lock(&shpool->mutex);
time_t now = ngx_time();
cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue);
while (cur != sentinel) {
next = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue);
// in this block, nothing in shared memory should be dereferenced.
ngx_http_request_t *r = cur->request;
if (r != NULL) {
if ((force_disconnect == 1) || ((cur->expires != 0) && (now > cur->expires))) {
ngx_http_push_stream_worker_subscriber_cleanup_locked(cur);
ngx_http_finalize_request(r, NGX_HTTP_OK);
} else {
break;
}
}
cur = next;
}
ngx_shmtx_unlock(&shpool->mutex);
}
static ngx_inline void
ngx_http_push_stream_send_worker_ping_message(void)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_push_stream_worker_subscriber_t *sentinel = thisworker_data->worker_subscribers_sentinel;
// copy everything we need first
ngx_chain_t *chain;
ngx_http_request_t *r;
ngx_buf_t *buffer;
u_char *pos;
ngx_http_push_stream_worker_subscriber_t *cur, *next;
ngx_pool_t *temp_pool;
ngx_shmtx_lock(&shpool->mutex);
if ((temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, ngx_http_push_stream_pool->log)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, ngx_http_push_stream_pool->log, 0, "push stream module: unable to allocate memory for temporary pool");
return;
}
// preallocate output chain. yes, same one for every waiting subscriber
if ((chain = ngx_http_push_stream_create_output_chain_locked(ngx_http_push_stream_ping_msg->buf, temp_pool, ngx_http_push_stream_pool->log, shpool)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: unable to create output chain while responding to several subscriber request");
ngx_destroy_pool(temp_pool);
return;
}
buffer = chain->buf;
pos = buffer->pos;
ngx_shmtx_unlock(&shpool->mutex);
buffer->last_buf = 0;
cur = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&sentinel->queue);
// now let's respond to some requests!
while (cur != sentinel) {
next = (ngx_http_push_stream_worker_subscriber_t *) ngx_queue_next(&cur->queue);
// in this block, nothing in shared memory should be dereferenced.
r = cur->request;
if (r != NULL) {
r->discard_body = 0; // hacky hacky!
ngx_http_output_filter(r, chain);
ngx_http_send_special(r, NGX_HTTP_FLUSH);
// rewind the buffer, please
buffer->pos = pos;
buffer->last_buf = 0;
}
cur = next;
}
ngx_destroy_pool(temp_pool);
}
static ngx_inline void
ngx_http_push_stream_process_worker_message(void)
{
ngx_http_push_stream_worker_msg_t *prev_worker_msg, *worker_msg, *sentinel;
const ngx_str_t *status_line = NULL;
ngx_http_push_stream_channel_t *channel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_subscriber_t *subscriber_sentinel;
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_int_t status_code;
ngx_http_push_stream_msg_t *msg;
sentinel = (ngx_http_push_stream_worker_msg_t *) &thisworker_data->messages_queue;
worker_msg = (ngx_http_push_stream_worker_msg_t *) ngx_queue_next(&sentinel->queue);
while (worker_msg != sentinel) {
if (worker_msg->pid == ngx_pid) {
// everything is okay
status_code = worker_msg->status_code;
msg = worker_msg->msg;
channel = worker_msg->channel;
subscriber_sentinel = worker_msg->subscriber_sentinel;
if (msg == NULL) {
// just a status line, is all
// status code only
switch (status_code) {
case NGX_HTTP_CONFLICT:
status_line = &NGX_HTTP_PUSH_STREAM_HTTP_STATUS_409;
break;
case NGX_HTTP_GONE:
status_line = &NGX_HTTP_PUSH_STREAM_HTTP_STATUS_410;
break;
case 0:
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: worker message contains neither a channel message nor a status code");
ngx_shmtx_lock(&shpool->mutex);
// let's let the subscribers know that something went wrong and they might've missed a message
status_code = NGX_HTTP_INTERNAL_SERVER_ERROR;
// intentional fall-through
default:
status_line = NULL;
}
}
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_push_stream_respond_to_subscribers(channel, subscriber_sentinel, msg, status_code, status_line);
ngx_shmtx_lock(&shpool->mutex);
} else {
// that's quite bad you see. a previous worker died with an undelivered message.
// but all its subscribers' connections presumably got canned, too. so it's not so bad after all.
ngx_http_push_stream_pid_queue_t *channel_worker_sentinel = &worker_msg->channel->workers_with_subscribers;
ngx_http_push_stream_pid_queue_t *channel_worker_cur = channel_worker_sentinel;
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: worker %i intercepted a message intended for another worker process (%i) that probably died", ngx_pid, worker_msg->pid);
ngx_shmtx_lock(&shpool->mutex);
// delete that invalid sucker
while ((channel_worker_cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&channel_worker_cur->queue)) != channel_worker_sentinel) {
if (channel_worker_cur->pid == worker_msg->pid) {
ngx_queue_remove(&channel_worker_cur->queue);
ngx_slab_free_locked(shpool, channel_worker_cur->subscriber_sentinel);
ngx_slab_free_locked(shpool, channel_worker_cur);
break;
}
}
}
// It may be worth it to memzero worker_msg for debugging purposes.
prev_worker_msg = worker_msg;
worker_msg = (ngx_http_push_stream_worker_msg_t *) ngx_queue_next(&worker_msg->queue);
ngx_slab_free_locked(shpool, prev_worker_msg);
}
ngx_queue_init(&thisworker_data->messages_queue); // reset the worker message sentinel
ngx_shmtx_unlock(&shpool->mutex);
return;
}
static ngx_int_t
ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *subscriber_sentinel, ngx_pid_t pid, ngx_int_t worker_slot, ngx_http_push_stream_msg_t *msg, ngx_int_t status_code, ngx_log_t *log)
{
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + worker_slot;
ngx_http_push_stream_worker_msg_t *newmessage;
ngx_shmtx_lock(&shpool->mutex);
if ((newmessage = ngx_slab_alloc_locked(shpool, sizeof(*newmessage))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate worker message");
return NGX_ERROR;
}
ngx_queue_insert_tail(&thisworker_data->messages_queue, &newmessage->queue);
newmessage->msg = msg;
newmessage->status_code = status_code;
newmessage->pid = pid;
newmessage->subscriber_sentinel = subscriber_sentinel;
newmessage->channel = channel;
ngx_shmtx_unlock(&shpool->mutex);
return NGX_OK;
}
#include <ngx_http_push_stream_module.h>
static ngx_int_t
ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
{
ngx_int_t rc;
/*
* Instruct ngx_http_read_subscriber_request_body to store the request
* body entirely in a memory buffer or in a file.
*/
r->request_body_in_single_buf = 1;
r->request_body_in_persistent_file = 1;
r->request_body_in_clean_file = 0;
r->request_body_file_log_level = 0;
r->keepalive = 0;
rc = ngx_http_read_client_request_body(r, ngx_http_push_stream_publisher_body_handler);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
return NGX_DONE;
}
static void
ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
{
ngx_str_t *id;
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_buf_t *buf = NULL, *buf_copy, *buf_msg = NULL;
ngx_http_push_stream_channel_t *channel;
ngx_uint_t method = r->method;
ngx_uint_t subscribers = 0;
ngx_uint_t published_messages = 0;
ngx_uint_t stored_messages = 0;
if ((id = ngx_http_push_stream_get_channel_id(r, cf)) == NULL) {
ngx_http_finalize_request(r, r->headers_out.status ? NGX_OK : NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
ngx_shmtx_lock(&shpool->mutex);
// POST requests will need a channel created if it doesn't yet exist.
if ((ngx_strstr(id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data) != (const char *) id->data) && (method == NGX_HTTP_POST || method == NGX_HTTP_PUT)) {
channel = ngx_http_push_stream_get_channel(id, r->connection->log);
NGX_HTTP_PUSH_STREAM_PUBLISHER_CHECK_LOCKED(channel, NULL, r, "push stream module: unable to allocate memory for new channel", shpool);
} else { // no other request method needs that.
// just find the channel. if it's not there, NULL.
channel = ngx_http_push_stream_find_channel(id, r->connection->log);
}
if (channel != NULL) {
subscribers = channel->subscribers;
published_messages = channel->last_message_id;
stored_messages = channel->stored_messages;
} else if ((method != NGX_HTTP_GET) || (ngx_strstr(id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data) != (const char *) id->data)) {
// 404!
ngx_shmtx_unlock(&shpool->mutex);
r->headers_out.status = NGX_HTTP_NOT_FOUND;
// just the headers, please. we don't care to describe the situation or
// respond with an html page
r->headers_out.content_length_n = 0;
r->header_only = 1;
ngx_http_finalize_request(r, ngx_http_send_header(r));
return;
}
ngx_shmtx_unlock(&shpool->mutex);
switch (method) {
ngx_http_push_stream_msg_t *msg;
ngx_http_push_stream_msg_t *sentinel;
case NGX_HTTP_POST:
// first off, we'll want to extract the body buffer
// note: this works mostly because of r->request_body_in_single_buf = 1;
// which, i suppose, makes this module a little slower than it could be.
// this block is a little hacky. might be a thorn for forward-compatibility.
if (r->headers_in.content_length_n == -1 || r->headers_in.content_length_n == 0) {
buf = ngx_create_temp_buf(r->pool, 0);
// this buffer will get copied to shared memory in a few lines,
// so it does't matter what pool we make it in.
} else if (r->request_body->bufs->buf != NULL) { // everything in the first buffer, please
buf = r->request_body->bufs->buf;
} else if (r->request_body->bufs->next != NULL) {
buf = r->request_body->bufs->next->buf;
} else {
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, "push stream module: unexpected publisher message request body buffer location. please report this to the push stream module developers.");
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
if ((r->headers_in.content_length_n > 0) && (buf != NULL)) {
buf->last = buf->pos + r->headers_in.content_length_n;
*buf->last = '\0';
}
NGX_HTTP_PUSH_STREAM_PUBLISHER_CHECK(buf, NULL, r, "push stream module: can't find or allocate publisher request body buffer");
ngx_shmtx_lock(&shpool->mutex);
buf_msg = ngx_http_push_stream_get_formatted_message_locked(cf, channel, buf, r->pool);
// create a buffer copy in shared mem
msg = ngx_http_push_stream_slab_alloc_locked(sizeof(*msg));
NGX_HTTP_PUSH_STREAM_PUBLISHER_CHECK_LOCKED(msg, NULL, r, "push stream module: unable to allocate message in shared memory", shpool);
buf_copy = ngx_http_push_stream_slab_alloc_locked(NGX_HTTP_PUSH_STREAM_BUF_ALLOC_SIZE(buf_msg));
NGX_HTTP_PUSH_STREAM_PUBLISHER_CHECK_LOCKED(buf_copy, NULL, r, "push stream module: unable to allocate buffer in shared memory", shpool) // magic nullcheck
ngx_http_push_stream_copy_preallocated_buffer(buf_msg, buf_copy);
msg->buf = buf_copy;
channel->last_message_id++;
if (cf->store_messages) {
ngx_queue_insert_tail(&channel->message_queue->queue, &msg->queue);
channel->stored_messages++;
}
// set message expiration time
time_t message_timeout = cf->buffer_timeout;
msg->expires = (message_timeout == 0 ? 0 : (ngx_time() + message_timeout));
msg->persistent = (message_timeout == 0 ? 1 : 0);
msg->delete_oldest_received_min_messages = cf->delete_oldest_received_message ? (ngx_uint_t) cf->min_messages : NGX_MAX_UINT32_VALUE;
// NGX_MAX_UINT32_VALUE to disable, otherwise = min_message_buffer_size of the publisher location from whence the message came
// FMI (For My Information): shm is still locked.
switch (ngx_http_push_stream_broadcast_message_locked(channel, msg, r->connection->log, shpool)) {
case NGX_HTTP_PUSH_STREAM_MESSAGE_QUEUED:
// message was queued successfully, but there were no
// subscribers to receive it.
r->headers_out.status = NGX_HTTP_ACCEPTED;
r->headers_out.status_line.len = sizeof("202 Accepted") - 1;
r->headers_out.status_line.data = (u_char *) "202 Accepted";
break;
case NGX_HTTP_PUSH_STREAM_MESSAGE_RECEIVED:
// message was queued successfully, and it was already sent
// to at least one subscriber
r->headers_out.status = NGX_HTTP_CREATED;
r->headers_out.status_line.len = sizeof("201 Created") - 1;
r->headers_out.status_line.data = (u_char *) "201 Created";
// update the number of times the message was received.
// in the interest of premature optimization, I assume all
// current subscribers have received the message successfully.
break;
case NGX_ERROR:
// WTF?
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: error broadcasting message to workers");
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
default:
// for debugging, mostly. I don't expect this branch to be
// hit during regular operation
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: TOTALLY UNEXPECTED error broadcasting message to workers");
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
// shm is still locked I hope.
if (buf->file != NULL) {
// future subscribers won't be able to use this file descriptor --
// it will be closed once the publisher request is finalized.
// (That's about to happen a handful of lines below.)
msg->buf->file->fd = NGX_INVALID_FILE;
}
// now see if the queue is too big
if (channel->stored_messages > (ngx_uint_t) cf->max_messages) {
// exceeeds max queue size. force-delete oldest message
ngx_http_push_stream_force_delete_message_locked(channel, ngx_http_push_stream_get_oldest_message_locked(channel), shpool);
}
if (channel->stored_messages > (ngx_uint_t) cf->min_messages) {
// exceeeds min queue size. maybe delete the oldest message
ngx_http_push_stream_msg_t *oldest_msg = ngx_http_push_stream_get_oldest_message_locked(channel);
NGX_HTTP_PUSH_STREAM_PUBLISHER_CHECK_LOCKED(oldest_msg, NULL, r, "push stream module: oldest message not found", shpool);
}
published_messages = channel->last_message_id;
stored_messages = channel->stored_messages;
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_finalize_request(r, ngx_http_push_stream_channel_info(r, channel->id, published_messages, stored_messages, subscribers));
return;
case NGX_HTTP_PUT:
case NGX_HTTP_GET:
r->headers_out.status = NGX_HTTP_OK;
if (ngx_strstr(id->data, NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID.data) == (const char *) id->data) {
ngx_http_finalize_request(r, ngx_http_push_stream_all_channels_info(r));
} else {
ngx_http_finalize_request(r, ngx_http_push_stream_channel_info(r, channel->id, published_messages, stored_messages, subscribers));
}
return;
case NGX_HTTP_DELETE:
ngx_shmtx_lock(&shpool->mutex);
sentinel = channel->message_queue;
msg = sentinel;
while ((msg = (ngx_http_push_stream_msg_t *) ngx_queue_next(&msg->queue)) != sentinel) {
// force-delete all the messages
ngx_http_push_stream_force_delete_message_locked(NULL, msg, shpool);
}
channel->last_message_id = 0;
channel->stored_messages = 0;
published_messages = channel->last_message_id;
stored_messages = channel->stored_messages;
// 410 gone
NGX_HTTP_PUSH_STREAM_PUBLISHER_CHECK_LOCKED(ngx_http_push_stream_broadcast_status_locked(channel, NGX_HTTP_GONE, &NGX_HTTP_PUSH_STREAM_HTTP_STATUS_410, r->connection->log, shpool), NGX_ERROR, r, "push stream module: unable to send current subscribers a 410 Gone response", shpool);
ngx_http_push_stream_delete_channel_locked(channel);
ngx_shmtx_unlock(&shpool->mutex);
// done.
r->headers_out.status = NGX_HTTP_OK;
ngx_http_finalize_request(r, ngx_http_push_stream_channel_info(r, channel->id, published_messages, stored_messages, subscribers));
return;
default:
// some other weird request method
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ALLOW, &NGX_HTTP_PUSH_STREAM_ALLOW_GET_POST_PUT_DELETE);
ngx_http_finalize_request(r, NGX_HTTP_NOT_ALLOWED);
return;
}
}
#include <ngx_http_push_stream_module.h>
static ngx_command_t ngx_http_push_stream_commands[] = {
{ ngx_string("push_stream_publisher"),
NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS,
ngx_http_push_stream_publisher,
NGX_HTTP_LOC_CONF_OFFSET,
0,
NULL },
{ ngx_string("push_stream_subscriber"),
NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS,
ngx_http_push_stream_subscriber,
NGX_HTTP_LOC_CONF_OFFSET,
0,
NULL },
{ ngx_string("push_stream_max_reserved_memory"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_push_stream_main_conf_t, shm_size),
NULL },
{ ngx_string("push_stream_store_messages"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, store_messages),
NULL },
{ ngx_string("push_stream_delete_oldest_received_message"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, delete_oldest_received_message),
NULL },
{ ngx_string("push_stream_min_message_buffer_timeout"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, buffer_timeout),
NULL },
{ ngx_string("push_stream_min_message_buffer_length"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, min_messages),
NULL },
{ ngx_string("push_stream_max_message_buffer_length"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, max_messages),
NULL },
{ ngx_string("push_stream_max_channel_id_length"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, max_channel_id_length),
NULL },
{ ngx_string("push_stream_authorized_channels_only"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, authorize_channel),
NULL },
{ ngx_string("push_stream_header_template"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, header_template),
NULL },
{ ngx_string("push_stream_message_template"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, message_template),
NULL },
{ ngx_string("push_stream_content_type"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, content_type),
NULL },
{ ngx_string("push_stream_ping_message_interval"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, ping_message_interval),
NULL },
{ ngx_string("push_stream_subscriber_disconnect_interval"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, subscriber_disconnect_interval),
NULL },
{ ngx_string("push_stream_subscriber_connection_timeout"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_sec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, subscriber_connection_timeout),
NULL },
{ ngx_string("push_stream_broadcast_channel_prefix"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, broadcast_channel_prefix),
NULL },
{ ngx_string("push_stream_broadcast_channel_max_qtd"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_stream_loc_conf_t, broadcast_channel_max_qtd),
NULL },
ngx_null_command
};
static ngx_http_module_t ngx_http_push_stream_module_ctx = {
NULL, /* preconfiguration */
ngx_http_push_stream_postconfig, /* postconfiguration */
ngx_http_push_stream_create_main_conf, /* create main configuration */
NULL, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
ngx_http_push_stream_create_loc_conf, /* create location configuration */
ngx_http_push_stream_merge_loc_conf, /* merge location configuration */
};
ngx_module_t ngx_http_push_stream_module = {
NGX_MODULE_V1,
&ngx_http_push_stream_module_ctx, /* module context */
ngx_http_push_stream_commands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
ngx_http_push_stream_init_module, /* init module */
ngx_http_push_stream_init_worker, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
ngx_http_push_stream_exit_worker, /* exit process */
ngx_http_push_stream_exit_master, /* exit master */
NGX_MODULE_V1_PADDING
};
static ngx_int_t
ngx_http_push_stream_init_module(ngx_cycle_t *cycle)
{
ngx_core_conf_t *ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
ngx_http_push_stream_worker_processes = ccf->worker_processes;
// initialize subscriber queues
// pool, please
if ((ngx_http_push_stream_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, cycle->log)) == NULL) { // I trust the cycle pool size to be a well-tuned one.
return NGX_ERROR;
}
if (ngx_http_push_stream_ping_msg == NULL) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_shmtx_lock(&shpool->mutex);
if (ngx_http_push_stream_ping_msg == NULL) {
if ((ngx_http_push_stream_ping_msg = ngx_http_push_stream_slab_alloc_locked(sizeof(ngx_http_push_stream_msg_t))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, cycle->log, 0, "push stream module: unable to allocate memory for ngx_http_push_stream_ping_msg");
return NGX_ERROR;
}
ngx_http_push_stream_ping_msg->expires = 0;
ngx_http_push_stream_ping_msg->delete_oldest_received_min_messages = NGX_MAX_UINT32_VALUE;
ngx_http_push_stream_ping_msg->persistent = 1;
}
ngx_shmtx_unlock(&shpool->mutex);
}
NGX_HTTP_PUSH_STREAM_MAKE_IN_MEMORY_CHAIN(ngx_http_push_stream_header_chain, ngx_http_push_stream_pool, "push stream module: unable to allocate chain to send header to new subscribers");
NGX_HTTP_PUSH_STREAM_MAKE_IN_MEMORY_CHAIN(ngx_http_push_stream_crlf_chain, ngx_http_push_stream_pool, "push stream module: unable to allocate chain to send crlf to subscribers, on flush");
// initialize our little IPC
return ngx_http_push_stream_init_ipc(cycle, ngx_http_push_stream_worker_processes);
}
static ngx_int_t
ngx_http_push_stream_init_worker(ngx_cycle_t *cycle)
{
if ((ngx_http_push_stream_init_ipc_shm(ngx_http_push_stream_worker_processes)) != NGX_OK) {
return NGX_ERROR;
}
return ngx_http_push_stream_register_worker_message_handler(cycle);
}
static void
ngx_http_push_stream_exit_master(ngx_cycle_t *cycle)
{
ngx_pfree(ngx_http_push_stream_pool, ngx_http_push_stream_ping_buf);
ngx_pfree(ngx_http_push_stream_pool, ngx_http_push_stream_ping_msg);
ngx_free_chain(ngx_http_push_stream_pool, ngx_http_push_stream_header_chain);
ngx_free_chain(ngx_http_push_stream_pool, ngx_http_push_stream_crlf_chain);
// destroy channel tree in shared memory
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_movezig_channel_locked);
}
static void
ngx_http_push_stream_exit_worker(ngx_cycle_t *cycle)
{
// disconnect all subscribers (force_disconnect = 1)
ngx_http_push_stream_disconnect_worker_subscribers(1);
if (ngx_http_push_stream_ping_event.timer_set) {
ngx_del_timer(&ngx_http_push_stream_ping_event);
}
ngx_http_push_stream_ipc_exit_worker(cycle);
}
static ngx_int_t
ngx_http_push_stream_postconfig(ngx_conf_t *cf)
{
ngx_http_push_stream_main_conf_t *conf = ngx_http_conf_get_module_main_conf(cf, ngx_http_push_stream_module);
size_t shm_size;
// initialize shared memory
if (conf->shm_size == NGX_CONF_UNSET_SIZE) {
conf->shm_size = NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_SIZE;
}
shm_size = ngx_align(conf->shm_size, ngx_pagesize);
if (shm_size < 8 * ngx_pagesize) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "The push_stream_max_reserved_memory value must be at least %udKiB", (8 * ngx_pagesize) >> 10);
shm_size = 8 * ngx_pagesize;
}
if (ngx_http_push_stream_shm_zone && ngx_http_push_stream_shm_zone->shm.size != shm_size) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0, "Cannot change memory area size without restart, ignoring change");
}
ngx_conf_log_error(NGX_LOG_INFO, cf, 0, "Using %udKiB of shared memory for push stream module", shm_size >> 10);
return ngx_http_push_stream_set_up_shm(cf, shm_size);
}
// main config
static void *
ngx_http_push_stream_create_main_conf(ngx_conf_t *cf)
{
ngx_http_push_stream_main_conf_t *mcf = ngx_pcalloc(cf->pool, sizeof(*mcf));
if (mcf == NULL) {
return NGX_CONF_ERROR;
}
mcf->shm_size = NGX_CONF_UNSET_SIZE;
return mcf;
}
// location config stuff
static void *
ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
{
ngx_http_push_stream_loc_conf_t *lcf = ngx_pcalloc(cf->pool, sizeof(*lcf));
if (lcf == NULL) {
return NGX_CONF_ERROR;
}
lcf->buffer_timeout = NGX_CONF_UNSET;
lcf->max_messages = NGX_CONF_UNSET;
lcf->min_messages = NGX_CONF_UNSET;
lcf->authorize_channel = NGX_CONF_UNSET;
lcf->store_messages = NGX_CONF_UNSET;
lcf->delete_oldest_received_message = NGX_CONF_UNSET;
lcf->max_channel_id_length = NGX_CONF_UNSET;
lcf->message_template.data = NULL;
lcf->header_template.data = NULL;
lcf->ping_message_interval = NGX_CONF_UNSET_MSEC;
lcf->content_type.data = NULL;
lcf->subscriber_disconnect_interval = NGX_CONF_UNSET_MSEC;
lcf->subscriber_connection_timeout = NGX_CONF_UNSET;
lcf->broadcast_channel_prefix.data = NULL;
lcf->broadcast_channel_max_qtd = NGX_CONF_UNSET;
return lcf;
}
static char *
ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_http_push_stream_loc_conf_t *prev = parent, *conf = child;
ngx_conf_merge_sec_value(conf->buffer_timeout, prev->buffer_timeout, NGX_HTTP_PUSH_STREAM_DEFAULT_BUFFER_TIMEOUT);
ngx_conf_merge_value(conf->max_messages, prev->max_messages, NGX_HTTP_PUSH_STREAM_DEFAULT_MAX_MESSAGES);
ngx_conf_merge_value(conf->min_messages, prev->min_messages, NGX_HTTP_PUSH_STREAM_DEFAULT_MIN_MESSAGES);
ngx_conf_merge_value(conf->authorize_channel, prev->authorize_channel, 1);
ngx_conf_merge_value(conf->store_messages, prev->store_messages, 1);
ngx_conf_merge_value(conf->delete_oldest_received_message, prev->delete_oldest_received_message, 0);
ngx_conf_merge_value(conf->max_channel_id_length, prev->max_channel_id_length, NGX_HTTP_PUSH_STREAM_MAX_CHANNEL_ID_LENGTH);
ngx_conf_merge_str_value(conf->header_template, prev->header_template, NGX_HTTP_PUSH_STREAM_DEFAULT_HEADER_TEMPLATE);
ngx_conf_merge_str_value(conf->message_template, prev->message_template, NGX_HTTP_PUSH_STREAM_DEFAULT_MESSAGE_TEMPLATE);
ngx_conf_merge_msec_value(conf->ping_message_interval, prev->ping_message_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_str_value(conf->content_type, prev->content_type, NGX_HTTP_PUSH_STREAM_DEFAULT_CONTENT_TYPE);
ngx_conf_merge_msec_value(conf->subscriber_disconnect_interval, prev->subscriber_disconnect_interval, NGX_CONF_UNSET_MSEC);
ngx_conf_merge_sec_value(conf->subscriber_connection_timeout, prev->subscriber_connection_timeout, NGX_CONF_UNSET);
ngx_conf_merge_str_value(conf->broadcast_channel_prefix, prev->broadcast_channel_prefix, NGX_HTTP_PUSH_STREAM_DEFAULT_BROADCAST_CHANNEL_PREFIX);
ngx_conf_merge_value(conf->broadcast_channel_max_qtd, prev->broadcast_channel_max_qtd, 1);
// sanity checks
if (conf->max_messages < conf->min_messages) {
// min/max buffer size makes sense?
ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push_stream_max_message_buffer_length cannot be smaller than push_stream_min_message_buffer_length.");
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
static char *
ngx_http_push_stream_setup_handler(ngx_conf_t *cf, void *conf, ngx_int_t (*handler) (ngx_http_request_t *))
{
ngx_http_core_loc_conf_t *clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
ngx_http_push_stream_loc_conf_t *pslcf = conf;
clcf->handler = handler;
clcf->if_modified_since = NGX_HTTP_IMS_OFF;
pslcf->index_channel_id = ngx_http_get_variable_index(cf, &ngx_http_push_stream_channel_id);
if (pslcf->index_channel_id == NGX_ERROR) {
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
static char *
ngx_http_push_stream_publisher(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
return ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_publisher_handler);
}
static char *
ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
char *rc = ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_subscriber_handler);
if (rc == NGX_CONF_OK) {
ngx_http_push_stream_loc_conf_t *pslcf = conf;
pslcf->index_channels_path = ngx_http_get_variable_index(cf, &ngx_http_push_stream_channels_path);
if (pslcf->index_channels_path == NGX_ERROR) {
rc = NGX_CONF_ERROR;
}
}
return rc;
}
// shared memory
static ngx_int_t
ngx_http_push_stream_set_up_shm(ngx_conf_t *cf, size_t shm_size)
{
ngx_http_push_stream_shm_zone = ngx_shared_memory_add(cf, &ngx_push_stream_shm_name, shm_size, &ngx_http_push_stream_module);
if (ngx_http_push_stream_shm_zone == NULL) {
return NGX_ERROR;
}
ngx_http_push_stream_shm_zone->init = ngx_http_push_stream_init_shm_zone;
ngx_http_push_stream_shm_zone->data = (void *) 1;
return NGX_OK;
}
// shared memory zone initializer
static ngx_int_t
ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)
{
if (data) { /* zone already initialized */
shm_zone->data = data;
return NGX_OK;
}
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
ngx_rbtree_node_t *sentinel;
ngx_http_push_stream_shm_data_t *d;
ngx_http_push_stream_shpool = shpool; // we'll be using this a bit.
if ((d = (ngx_http_push_stream_shm_data_t *) ngx_slab_alloc(shpool, sizeof(*d))) == NULL) { //shm_data plus an array.
return NGX_ERROR;
}
shm_zone->data = d;
d->ipc = NULL;
// initialize rbtree
if ((sentinel = ngx_slab_alloc(shpool, sizeof(*sentinel))) == NULL) {
return NGX_ERROR;
}
ngx_rbtree_init(&d->tree, sentinel, ngx_http_push_stream_rbtree_insert);
return NGX_OK;
}
// great justice appears to be at hand
static ngx_int_t
ngx_http_push_stream_movezig_channel_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
ngx_queue_t *sentinel = &channel->message_queue->queue;
ngx_http_push_stream_msg_t *msg = NULL;
while (!ngx_queue_empty(sentinel)) {
msg = ngx_queue_data(ngx_queue_head(sentinel), ngx_http_push_stream_msg_t, queue);
ngx_http_push_stream_force_delete_message_locked(channel, msg, shpool);
}
return NGX_OK;
}
#include <ngx_http_push_stream_module.h>
static ngx_int_t
ngx_http_push_stream_subscriber_assign_channel_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_str_t *id, ngx_uint_t backtrack_messages, ngx_queue_t *messages_to_sent_queue, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_pool_t *temp_pool)
{
ngx_http_push_stream_pid_queue_t *sentinel, *cur, *found;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_subscriber_t *subscriber;
ngx_http_push_stream_subscriber_t *subscriber_sentinel;
ngx_queue_t *node;
ngx_http_push_stream_subscription_t *subscription;
ngx_flag_t is_broadcast_channel = 0;
if ((cf->broadcast_channel_max_qtd > 0) && (cf->broadcast_channel_prefix.len > 0)) {
u_char *broad_pos = (u_char *) ngx_strstr(id->data, cf->broadcast_channel_prefix.data);
if ((broad_pos != NULL) && (broad_pos == id->data)) {
is_broadcast_channel = 1;
}
}
channel = (((cf->authorize_channel == 1) && (is_broadcast_channel == 0)) ? ngx_http_push_stream_find_channel : ngx_http_push_stream_get_channel) (id, r->connection->log);
if (channel == NULL) {
// unable to allocate channel OR channel not found
ngx_shmtx_unlock(&shpool->mutex);
if (cf->authorize_channel) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: not authorized to access channel %s", id->data);
return NGX_HTTP_FORBIDDEN;
} else {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate shared memory for channel %s", id->data);
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
}
if ((channel->stored_messages == 0) && !is_broadcast_channel && cf->authorize_channel) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: not authorized to access channel %s, channel is empty of messages", id->data);
return NGX_HTTP_FORBIDDEN;
}
sentinel = &channel->workers_with_subscribers;
cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_head(&sentinel->queue);
found = NULL;
while (cur != sentinel) {
if (cur->pid == ngx_pid) {
found = cur;
break;
}
cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue);
}
if (found == NULL) { // found nothing
if ((found = ngx_http_push_stream_slab_alloc_locked(sizeof(*found))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate worker subscriber queue marker in shared memory");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
// initialize
ngx_queue_insert_tail(&sentinel->queue, &found->queue);
found->pid = ngx_pid;
found->slot = ngx_process_slot;
found->subscriber_sentinel = NULL;
}
// figure out the subscriber sentinel
subscriber_sentinel = ((ngx_http_push_stream_pid_queue_t *) found)->subscriber_sentinel;
if (subscriber_sentinel == NULL) {
// it's perfectly nornal for the sentinel to be NULL
if ((subscriber_sentinel=ngx_http_push_stream_slab_alloc_locked(sizeof(*subscriber_sentinel))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate channel subscriber sentinel");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_queue_init(&subscriber_sentinel->queue);
((ngx_http_push_stream_pid_queue_t *) found)->subscriber_sentinel=subscriber_sentinel;
}
if ((subscription = ngx_palloc(r->pool, sizeof(*subscription))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscribed channel reference");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if ((subscriber = ngx_palloc(r->pool, sizeof(*subscriber))) == NULL) { // unable to allocate request queue element
return NGX_ERROR;
}
subscriber->request = r;
subscription->channel = channel;
subscription->subscriber = subscriber;
channel->subscribers++; // do this only when we know everything went okay
// get old messages to send to new subscriber
if (channel->stored_messages > 0) {
node = ngx_queue_last(&channel->message_queue->queue);
ngx_uint_t qtd = (backtrack_messages > channel->stored_messages) ? channel->stored_messages : backtrack_messages;
while (qtd > 0) {
ngx_http_push_stream_msg_queue_t *message = NULL;
if ((message = ngx_palloc(temp_pool, sizeof(*message))) != NULL) {
message->msg = (ngx_http_push_stream_msg_t *) node;
ngx_queue_insert_head(messages_to_sent_queue, &message->queue);
}
node = ngx_queue_prev(node);
qtd--;
}
}
ngx_queue_insert_tail(&subscriptions_sentinel->queue, &subscription->queue);
ngx_queue_insert_tail(&subscriber_sentinel->queue, &subscriber->queue);
return NGX_OK;
}
static ngx_int_t
ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
{
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *)ngx_http_push_stream_shm_zone->shm.addr;
ngx_str_t *id, *channels_path;
ngx_http_push_stream_worker_subscriber_t *worker_subscriber = NULL;
ngx_http_push_stream_subscriber_cleanup_t *clndata;
ngx_pool_cleanup_t *cln;
ngx_pool_t *temp_pool;
ngx_queue_t messages_to_sent_queue;
ngx_http_variable_value_t *vv_channels_path = ngx_http_get_indexed_variable(r, cf->index_channels_path);
if (vv_channels_path == NULL || vv_channels_path->not_found || vv_channels_path->len == 0) {
ngx_http_push_stream_send_response_channel_id_not_provided(r);
return NGX_HTTP_NOT_FOUND;
}
if (r->method != NGX_HTTP_GET) {
ngx_http_push_stream_add_response_header(r, &NGX_HTTP_PUSH_STREAM_HEADER_ALLOW, &NGX_HTTP_PUSH_STREAM_ALLOW_GET); // valid HTTP for teh win
return NGX_HTTP_NOT_ALLOWED;
}
if ((temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, r->connection->log)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for temporary pool");
return NGX_ERROR;
}
if ((id = ngx_pcalloc(temp_pool, sizeof(*id) + vv_channels_path->len + 1)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channel_id string");
return NGX_ERROR;
}
id->data = (u_char *) (id + 1);
if ((channels_path = ngx_pcalloc(temp_pool, sizeof(*channels_path) + vv_channels_path->len + 1)) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for channels_path string");
return NGX_ERROR;
}
channels_path->data = (u_char *) (channels_path + 1);
channels_path->len = vv_channels_path->len;
ngx_memcpy(channels_path->data, vv_channels_path->data, vv_channels_path->len);
if ((worker_subscriber=ngx_palloc(r->pool, sizeof(*worker_subscriber))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate worker subscriber");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if ((worker_subscriber->subscriptions_sentinel = ngx_palloc(r->pool, sizeof(*worker_subscriber->subscriptions_sentinel))) == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate subscribed channels sentinel");
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
worker_subscriber->request = r;
worker_subscriber->worker_subscribed_pid = ngx_pid;
time_t subscriber_timeout = cf->subscriber_connection_timeout;
worker_subscriber->expires = ((subscriber_timeout == NGX_CONF_UNSET) || (subscriber_timeout == 0)) ? 0 : (ngx_time() + subscriber_timeout);
ngx_queue_init(&worker_subscriber->queue);
ngx_queue_init(&worker_subscriber->subscriptions_sentinel->queue);
// attach a cleaner to remove the request from the channel
if ((cln = ngx_pool_cleanup_add(r->pool, sizeof(*clndata))) == NULL) { // make sure we can
return NGX_ERROR;
}
cln->handler = (ngx_pool_cleanup_pt) ngx_http_push_stream_subscriber_cleanup;
clndata = (ngx_http_push_stream_subscriber_cleanup_t *) cln->data;
clndata->worker_subscriber = worker_subscriber;
worker_subscriber->clndata = clndata;
ngx_queue_init(&messages_to_sent_queue);
ngx_shmtx_lock(&shpool->mutex);
u_char *channel_pos = channels_path->data;
u_char *end = NULL, *slash_pos = NULL;
ngx_uint_t len = 0;
ngx_uint_t backtrack_messages = 0;
ngx_uint_t subscribed_channels_qtd = 0;
ngx_uint_t subscribed_broadcast_channels_qtd = 0;
// doing the parser of given channel path
while (channel_pos != NULL) {
end = channels_path->data + channels_path->len;
slash_pos = (u_char *) ngx_strstr(channel_pos, NGX_HTTP_PUSH_STREAM_SLASH.data);
if (slash_pos != NULL) {
end = slash_pos;
}
backtrack_messages = 0;
len = end - channel_pos;
u_char *backtrack_pos = (u_char *) ngx_strstr(channel_pos, NGX_HTTP_PUSH_STREAM_BACKTRACK_SEP.data);
if ((backtrack_pos != NULL) && (end > backtrack_pos)) {
len = backtrack_pos - channel_pos;
backtrack_pos = backtrack_pos + NGX_HTTP_PUSH_STREAM_BACKTRACK_SEP.len;
if (end > backtrack_pos) {
backtrack_messages = ngx_atoi(backtrack_pos, end - backtrack_pos);
}
}
if (len > 0) {
backtrack_messages = (backtrack_messages > 0) ? backtrack_messages : 0;
id->len = len;
ngx_memcpy(id->data, channel_pos, len);
*(id->data + id->len) = '\0';
ngx_int_t ret = ngx_http_push_stream_subscriber_assign_channel_locked(shpool, cf, r, id, backtrack_messages, &messages_to_sent_queue, worker_subscriber->subscriptions_sentinel, temp_pool);
if (ret != NGX_OK) {
// if get here the shpool already is unlocked
ngx_destroy_pool(temp_pool);
return ret;
}
subscribed_channels_qtd++;
if (cf->broadcast_channel_prefix.len > 0) {
u_char *broad_pos = (u_char *) ngx_strstr(channel_pos, cf->broadcast_channel_prefix.data);
if ((broad_pos != NULL) && (broad_pos == channel_pos)) {
subscribed_broadcast_channels_qtd++;
if (subscribed_broadcast_channels_qtd > (ngx_uint_t)cf->broadcast_channel_max_qtd) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_destroy_pool(temp_pool);
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: max subscribed broadcast channels exceeded");
return NGX_HTTP_FORBIDDEN;
}
}
}
}
channel_pos = NULL;
if (slash_pos != NULL) {
channel_pos = slash_pos + NGX_HTTP_PUSH_STREAM_SLASH.len;
}
}
ngx_shmtx_unlock(&shpool->mutex);
if ((subscribed_channels_qtd - subscribed_broadcast_channels_qtd) == 0) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: subscribe broadcast channel whithout subscribe a common channel");
ngx_destroy_pool(temp_pool);
return NGX_HTTP_FORBIDDEN;
}
r->read_event_handler = ngx_http_test_reading;
r->write_event_handler = ngx_http_request_empty_handler;
r->discard_body = 1;
#if defined nginx_version && nginx_version >= 8053
r->keepalive = 1;
#else
r->keepalive = 0;
#endif
r->headers_out.content_type = cf->content_type;
r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_length_n = -1;
ngx_http_push_stream_worker_data_t *workers_data = ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->ipc;
ngx_http_push_stream_worker_data_t *thisworker_data = workers_data + ngx_process_slot;
ngx_http_send_header(r);
ngx_http_push_stream_send_body_header(r, cf);
ngx_shmtx_lock(&shpool->mutex);
ngx_queue_insert_tail(&thisworker_data->worker_subscribers_sentinel->queue, &worker_subscriber->queue);
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_push_stream_ping_timer_set(cf);
ngx_http_push_stream_disconnect_timer_set(cf);
// send old messages to subscriber
if (&messages_to_sent_queue != ngx_queue_next(&messages_to_sent_queue)) {
ngx_chain_t *chain = NULL;
ngx_int_t rc = NGX_OK;
NGX_HTTP_PUSH_STREAM_MAKE_IN_MEMORY_CHAIN(chain, temp_pool, "push stream module: unable to allocate chain to send old messages to new subscriber");
ngx_queue_t *message = ngx_queue_next(&messages_to_sent_queue);
while (&messages_to_sent_queue != message) {
ngx_http_push_stream_msg_t *msg = ((ngx_http_push_stream_msg_queue_t *) message)->msg;
chain->buf->pos = msg->buf->pos;
chain->buf->last = msg->buf->last;
chain->buf->start = msg->buf->start;
chain->buf->end = msg->buf->end;
rc = ngx_http_output_filter(r, chain);
if (rc == NGX_OK) {
rc = ngx_http_send_special(r, NGX_HTTP_FLUSH);
}
if (rc != NGX_OK) {
break;
}
message = ngx_queue_next(message);
}
}
ngx_destroy_pool(temp_pool);
return NGX_DONE;
}
static ngx_int_t
ngx_http_push_stream_broadcast_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_int_t status_code, const ngx_str_t *status_line, ngx_log_t *log, ngx_slab_pool_t *shpool)
{
// subscribers are queued up in a local pool. Queue heads, however, are located
// in shared memory, identified by pid.
ngx_http_push_stream_pid_queue_t *sentinel = &channel->workers_with_subscribers;
ngx_http_push_stream_pid_queue_t *cur = sentinel;
ngx_int_t received;
received = channel->subscribers > 0 ? NGX_HTTP_PUSH_STREAM_MESSAGE_RECEIVED : NGX_HTTP_PUSH_STREAM_MESSAGE_QUEUED;
if ((msg != NULL) && (received == NGX_HTTP_PUSH_STREAM_MESSAGE_RECEIVED)) {
ngx_http_push_stream_reserve_message_locked(channel, msg);
}
while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
pid_t worker_pid = cur->pid;
ngx_int_t worker_slot = cur->slot;
ngx_http_push_stream_subscriber_t *subscriber_sentinel= cur->subscriber_sentinel;
ngx_shmtx_unlock(&shpool->mutex);
// interprocess communication breakdown
if (ngx_http_push_stream_send_worker_message(channel, subscriber_sentinel, worker_pid, worker_slot, msg, status_code, log) != NGX_ERROR) {
ngx_http_push_stream_alert_worker(worker_pid, worker_slot, log);
} else {
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with some other worker process");
}
ngx_shmtx_lock(&shpool->mutex);
}
return received;
}
static ngx_int_t
ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscriber_t *sentinel, ngx_http_push_stream_msg_t *msg, ngx_int_t status_code, const ngx_str_t *status_line)
{
ngx_slab_pool_t *shpool = ngx_http_push_stream_shpool;
ngx_http_push_stream_subscriber_t *cur, *next;
ngx_int_t responded_subscribers = 0;
if (sentinel == NULL) {
return NGX_OK;
}
cur = (ngx_http_push_stream_subscriber_t *) ngx_queue_head(&sentinel->queue);
if (msg != NULL) {
// copy everything we need first
ngx_chain_t *chain;
ngx_http_request_t *r;
ngx_buf_t *buffer;
u_char *pos;
ngx_pool_t *temp_pool;
ngx_shmtx_lock(&shpool->mutex);
if ((temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, ngx_http_push_stream_pool->log)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, ngx_http_push_stream_pool->log, 0, "push stream module: unable to allocate memory for temporary pool");
return NGX_ERROR;
}
// preallocate output chain. yes, same one for every waiting subscriber
if ((chain = ngx_http_push_stream_create_output_chain_locked(msg->buf, temp_pool, ngx_cycle->log, shpool)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: unable to create output chain while responding to several subscriber request");
ngx_destroy_pool(temp_pool);
return NGX_ERROR;
}
buffer = chain->buf;
pos = buffer->pos;
ngx_shmtx_unlock(&shpool->mutex);
buffer->last_buf = 0;
// now let's respond to some requests!
while (cur != sentinel) {
next = (ngx_http_push_stream_subscriber_t *) ngx_queue_next(&cur->queue);
// in this block, nothing in shared memory should be dereferenced
r = cur->request;
r->discard_body = 0; // hacky hacky!
ngx_http_output_filter(r, chain);
ngx_http_send_special(r, NGX_HTTP_FLUSH);
responded_subscribers++;
// rewind the buffer, please
buffer->pos = pos;
buffer->last_buf = 0;
cur = next;
}
// free everything relevant
if (buffer->file) {
ngx_close_file(buffer->file->fd);
}
if (responded_subscribers && !msg->persistent) {
ngx_shmtx_lock(&shpool->mutex);
// message deletion
ngx_http_push_stream_release_message_locked(channel, msg);
ngx_shmtx_unlock(&shpool->mutex);
}
ngx_destroy_pool(temp_pool);
} else {
// headers only probably
ngx_http_request_t *r;
while (cur != sentinel) {
next = (ngx_http_push_stream_subscriber_t *) ngx_queue_next(&cur->queue);
r = cur->request;
// cleanup oughtn't dequeue anything. or decrement the subscriber count, for that matter
cur->clndata->worker_subscriber = NULL;
ngx_http_push_stream_respond_status_only(r, status_code, status_line);
cur = next;
}
}
return NGX_OK;
}
static void
ngx_http_push_stream_subscriber_cleanup(ngx_http_push_stream_subscriber_cleanup_t *data)
{
if (data->worker_subscriber != NULL) {
ngx_shmtx_lock(&ngx_http_push_stream_shpool->mutex);
ngx_http_push_stream_worker_subscriber_cleanup_locked(data->worker_subscriber);
ngx_shmtx_unlock(&ngx_http_push_stream_shpool->mutex);
}
}
#include <ngx_http_push_stream_module.h>
#define NGX_HTTP_PUSH_STREAM_BUF_ALLOC_SIZE(buf) \
(sizeof(*buf) + \
(((buf)->temporary || (buf)->memory) ? ngx_buf_size(buf) : 0) + \
(((buf)->file!=NULL) ? (sizeof(*(buf)->file) + (buf)->file->name.len + 1) : 0))
#define NGX_HTTP_PUSH_STREAM_PUBLISHER_CHECK(val, fail, r, errormessage) \
if (val == fail) { \
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, errormessage); \
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); \
return; \
}
#define NGX_HTTP_PUSH_STREAM_PUBLISHER_CHECK_LOCKED(val, fail, r, errormessage, shpool) \
if (val == fail) { \
ngx_shmtx_unlock(&(shpool)->mutex); \
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, errormessage); \
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); \
return; \
}
#define NGX_HTTP_PUSH_STREAM_MAKE_IN_MEMORY_CHAIN(chain, pool, errormessage) \
if (chain == NULL) { \
ngx_buf_t *buffer; \
chain = ngx_pcalloc(pool, sizeof(ngx_chain_t)); \
buffer = ngx_pcalloc(pool, sizeof(ngx_buf_t)); \
if ((chain == NULL) || (buffer == NULL)) { \
ngx_log_error(NGX_LOG_ERR, pool->log, 0, errormessage); \
return NGX_ERROR; \
} \
buffer->pos = NULL; \
buffer->temporary = 0; \
buffer->memory = 1; \
buffer->last_buf = 0; \
chain->buf = buffer; \
chain->next = NULL; \
}
// buffer is _copied_
// if shpool is provided, it is assumed that shm it is locked
static ngx_chain_t *
ngx_http_push_stream_create_output_chain_general(ngx_buf_t *buf, ngx_pool_t *pool, ngx_log_t *log, ngx_slab_pool_t *shpool)
{
ngx_chain_t *out;
ngx_file_t *file;
if ((out = ngx_pcalloc(pool, sizeof(*out))) == NULL) {
return NULL;
}
ngx_buf_t *buf_copy;
if ((buf_copy = ngx_pcalloc(pool, NGX_HTTP_PUSH_STREAM_BUF_ALLOC_SIZE(buf))) == NULL) {
return NULL;
}
ngx_http_push_stream_copy_preallocated_buffer(buf, buf_copy);
if (buf->file != NULL) {
file = buf_copy->file;
file->log = log;
if (file->fd == NGX_INVALID_FILE) {
if (shpool) {
ngx_shmtx_unlock(&shpool->mutex);
file->fd = ngx_open_file(file->name.data, NGX_FILE_RDONLY, NGX_FILE_OPEN, NGX_FILE_OWNER_ACCESS);
ngx_shmtx_lock(&shpool->mutex);
} else {
file->fd = ngx_open_file(file->name.data, NGX_FILE_RDONLY, NGX_FILE_OPEN, NGX_FILE_OWNER_ACCESS);
}
}
if (file->fd == NGX_INVALID_FILE) {
return NULL;
}
}
buf_copy->last_buf = 1;
out->buf = buf_copy;
out->next = NULL;
return out;
}
static void
ngx_http_push_stream_copy_preallocated_buffer(ngx_buf_t *buf, ngx_buf_t *cbuf)
{
if (cbuf != NULL) {
ngx_memcpy(cbuf, buf, sizeof(*buf)); // overkill?
if (buf->temporary || buf->memory) { // we don't want to copy mmpapped memory, so no ngx_buf_in_momory(buf)
cbuf->pos = (u_char *) (cbuf + 1);
cbuf->last = cbuf->pos + ngx_buf_size(buf);
cbuf->start = cbuf->pos;
cbuf->end = cbuf->start + ngx_buf_size(buf);
ngx_memcpy(cbuf->pos, buf->pos, ngx_buf_size(buf));
cbuf->memory = ngx_buf_in_memory_only(buf) ? 1 : 0;
}
if (buf->file != NULL) {
cbuf->file = (ngx_file_t *) (cbuf + 1) + ((buf->temporary || buf->memory) ? ngx_buf_size(buf) : 0);
cbuf->file->fd = NGX_INVALID_FILE;
cbuf->file->log = NULL;
cbuf->file->offset = buf->file->offset;
cbuf->file->sys_offset = buf->file->sys_offset;
cbuf->file->name.len = buf->file->name.len;
cbuf->file->name.data = (u_char *) (cbuf->file + 1);
ngx_memcpy(cbuf->file->name.data, buf->file->name.data, buf->file->name.len);
}
}
}
// remove a message from queue and free all associated memory
// assumes shpool is already locked
static ngx_inline void
ngx_http_push_stream_general_delete_message_locked(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_int_t force, ngx_slab_pool_t *shpool)
{
if (msg == NULL) {
return;
}
if (!msg->persistent) {
if (channel != NULL) {
ngx_queue_remove(&msg->queue);
channel->stored_messages--;
}
if (msg->refcount <= 0 || force) {
// nobody needs this message, or we were forced at integer-point to delete
ngx_http_push_stream_free_message_locked(msg, shpool);
}
}
}
// free memory for a message
static ngx_inline void
ngx_http_push_stream_free_message_locked(ngx_http_push_stream_msg_t *msg, ngx_slab_pool_t *shpool)
{
if (msg->buf->file != NULL) {
ngx_shmtx_unlock(&shpool->mutex);
if (msg->buf->file->fd != NGX_INVALID_FILE) {
ngx_close_file(msg->buf->file->fd);
}
ngx_delete_file(msg->buf->file->name.data); // should I care about deletion errors? doubt it.
ngx_shmtx_lock(&shpool->mutex);
}
ngx_slab_free_locked(shpool, msg->buf); // separate block, remember?
ngx_slab_free_locked(shpool, msg);
}
// garbage-collecting slab allocator
void *
ngx_http_push_stream_slab_alloc_locked(size_t size)
{
void *p;
if ((p = ngx_slab_alloc_locked(ngx_http_push_stream_shpool, size)) == NULL) {
ngx_http_push_stream_channel_queue_t *ccur, *cnext;
ngx_uint_t collected = 0;
// failed. emergency garbage sweep, then collect channels
ngx_queue_init(&channel_gc_sentinel.queue);
ngx_http_push_stream_walk_rbtree(ngx_http_push_stream_channel_collector);
for(ccur=(ngx_http_push_stream_channel_queue_t *) ngx_queue_next(&channel_gc_sentinel.queue); ccur!=&channel_gc_sentinel; ccur=cnext) {
cnext = (ngx_http_push_stream_channel_queue_t *) ngx_queue_next(&ccur->queue);
ngx_http_push_stream_delete_channel_locked(ccur->channel);
ngx_free(ccur);
collected++;
}
// TODO: collect worker messages maybe
#if (NGX_DEBUG)
// only enable this log in debug mode
ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, "push module: out of shared memory. emergency garbage collection deleted %ui unused channels.", collected);
#endif
return ngx_slab_alloc_locked(ngx_http_push_stream_shpool, size);
}
return p;
}
//shpool must be locked. No memory is freed. O(1)
static ngx_http_push_stream_msg_t *
ngx_http_push_stream_get_oldest_message_locked(ngx_http_push_stream_channel_t *channel)
{
ngx_queue_t *sentinel = &channel->message_queue->queue;
if (ngx_queue_empty(sentinel)) {
return NULL;
}
ngx_queue_t *qmsg = ngx_queue_head(sentinel);
return ngx_queue_data(qmsg, ngx_http_push_stream_msg_t, queue);
}
static ngx_int_t
ngx_http_push_stream_channel_collector(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
if ((ngx_http_push_stream_clean_channel_locked(channel)) != NULL) { // we're up for deletion
ngx_http_push_stream_channel_queue_t *trashy;
if ((trashy = ngx_alloc(sizeof(*trashy), ngx_cycle->log)) != NULL) {
// yeah, i'm allocating memory during garbage collection. sue me.
trashy->channel = channel;
ngx_queue_insert_tail(&channel_gc_sentinel.queue, &trashy->queue);
return NGX_OK;
}
return NGX_ERROR;
}
return NGX_OK;
}
static ngx_table_elt_t *
ngx_http_push_stream_add_response_header(ngx_http_request_t *r, const ngx_str_t *header_name, const ngx_str_t *header_value)
{
ngx_table_elt_t *h = ngx_list_push(&r->headers_out.headers);
if (h == NULL) {
return NULL;
}
h->hash = 1;
h->key.len = header_name->len;
h->key.data = header_name->data;
h->value.len = header_value->len;
h->value.data = header_value->data;
return h;
}
static ngx_int_t
ngx_http_push_stream_send_body_header(ngx_http_request_t *r, ngx_http_push_stream_loc_conf_t *pslcf)
{
ngx_int_t rc = NGX_OK;
if (pslcf->header_template.len > 0) {
ngx_http_push_stream_header_chain->buf->pos = pslcf->header_template.data;
ngx_http_push_stream_header_chain->buf->last = pslcf->header_template.data + pslcf->header_template.len;
ngx_http_push_stream_header_chain->buf->start = ngx_http_push_stream_header_chain->buf->pos;
ngx_http_push_stream_header_chain->buf->end = ngx_http_push_stream_header_chain->buf->last;
rc = ngx_http_output_filter(r, ngx_http_push_stream_header_chain);
if (rc == NGX_OK) {
ngx_http_push_stream_crlf_chain->buf->pos = NGX_HTTP_PUSH_STREAM_CRLF.data;
ngx_http_push_stream_crlf_chain->buf->last = NGX_HTTP_PUSH_STREAM_CRLF.data + NGX_HTTP_PUSH_STREAM_CRLF.len;
ngx_http_push_stream_crlf_chain->buf->start = ngx_http_push_stream_crlf_chain->buf->pos;
ngx_http_push_stream_crlf_chain->buf->end = ngx_http_push_stream_crlf_chain->buf->last;
rc = ngx_http_output_filter(r, ngx_http_push_stream_crlf_chain);
if (rc == NGX_OK) {
rc = ngx_http_send_special(r, NGX_HTTP_FLUSH);
}
}
}
return rc;
}
static ngx_int_t
ngx_http_push_stream_send_ping(ngx_log_t *log, ngx_http_push_stream_loc_conf_t *pslcf)
{
if (pslcf->message_template.len > 0) {
if (ngx_http_push_stream_ping_buf == NULL) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_shmtx_lock(&shpool->mutex);
if (ngx_http_push_stream_ping_buf == NULL) {
ngx_buf_t *buf = NULL;
ngx_pool_t *temp_pool = NULL;
if ((temp_pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, log)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: unable to allocate memory for temporary pool");
return NGX_ERROR;
}
if ((buf = ngx_http_push_stream_get_formatted_message_locked(pslcf, NULL, NULL, temp_pool)) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, ngx_http_push_stream_pool->log, 0, "push stream module: unable to format ping message");
ngx_destroy_pool(temp_pool);
return NGX_ERROR;
}
if ((ngx_http_push_stream_ping_buf = ngx_http_push_stream_slab_alloc_locked(NGX_HTTP_PUSH_STREAM_BUF_ALLOC_SIZE(buf))) == NULL) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_log_error(NGX_LOG_ERR, ngx_http_push_stream_pool->log, 0, "push stream module: unable to allocate memory for formatted ping message");
ngx_destroy_pool(temp_pool);
return NGX_ERROR;
}
ngx_http_push_stream_copy_preallocated_buffer(buf, ngx_http_push_stream_ping_buf);
ngx_http_push_stream_ping_msg->buf = ngx_http_push_stream_ping_buf;
ngx_destroy_pool(temp_pool);
}
ngx_shmtx_unlock(&shpool->mutex);
}
ngx_http_push_stream_alert_worker_send_ping(ngx_pid, ngx_process_slot, ngx_http_push_stream_pool->log);
}
return NGX_OK;
}
static void
ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
{
ngx_http_push_stream_loc_conf_t *pslcf = ev->data;
ngx_http_push_stream_send_ping(ev->log, pslcf);
ngx_http_push_stream_ping_timer_reset(pslcf);
}
static void
ngx_http_push_stream_ping_timer_set(ngx_http_push_stream_loc_conf_t *pslcf)
{
if ((pslcf->message_template.len > 0) && (pslcf->ping_message_interval != NGX_CONF_UNSET_MSEC)) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
if (ngx_http_push_stream_ping_event.handler == NULL) {
ngx_shmtx_lock(&shpool->mutex);
if (ngx_http_push_stream_ping_event.handler == NULL) {
ngx_http_push_stream_ping_event.handler = ngx_http_push_stream_ping_timer_wake_handler;
ngx_http_push_stream_ping_event.data = pslcf;
ngx_http_push_stream_ping_event.log = ngx_http_push_stream_pool->log;
ngx_http_push_stream_ping_timer_reset(pslcf);
}
ngx_shmtx_unlock(&shpool->mutex);
}
}
}
static void
ngx_http_push_stream_ping_timer_reset(ngx_http_push_stream_loc_conf_t *pslcf)
{
if ((pslcf->message_template.len > 0) && (pslcf->ping_message_interval != NGX_CONF_UNSET_MSEC)) {
if (ngx_http_push_stream_ping_event.timedout) {
#if defined nginx_version && nginx_version >= 7066
ngx_time_update();
#else
ngx_time_update(0, 0);
#endif
}
ngx_add_timer(&ngx_http_push_stream_ping_event, pslcf->ping_message_interval);
}
}
static void
ngx_http_push_stream_disconnect_timer_set(ngx_http_push_stream_loc_conf_t *pslcf)
{
if (pslcf->subscriber_disconnect_interval != NGX_CONF_UNSET_MSEC) {
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
if (ngx_http_push_stream_disconnect_event.handler == NULL) {
ngx_shmtx_lock(&shpool->mutex);
if (ngx_http_push_stream_disconnect_event.handler == NULL) {
ngx_http_push_stream_disconnect_event.handler = ngx_http_push_stream_disconnect_timer_wake_handler;
ngx_http_push_stream_disconnect_event.data = pslcf;
ngx_http_push_stream_disconnect_event.log = ngx_http_push_stream_pool->log;
ngx_http_push_stream_disconnect_timer_reset(pslcf);
}
ngx_shmtx_unlock(&shpool->mutex);
}
}
}
static void
ngx_http_push_stream_disconnect_timer_reset(ngx_http_push_stream_loc_conf_t *pslcf)
{
if (pslcf->subscriber_disconnect_interval != NGX_CONF_UNSET_MSEC) {
if (ngx_http_push_stream_disconnect_event.timedout) {
#if defined nginx_version && nginx_version >= 7066
ngx_time_update();
#else
ngx_time_update(0, 0);
#endif
}
ngx_add_timer(&ngx_http_push_stream_disconnect_event, pslcf->subscriber_disconnect_interval);
}
}
static void
ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev)
{
ngx_http_push_stream_loc_conf_t *pslcf = ev->data;
ngx_http_push_stream_alert_worker_disconnect_subscribers(ngx_pid, ngx_process_slot, ngx_http_push_stream_pool->log);
ngx_http_push_stream_disconnect_timer_reset(pslcf);
}
static u_char *
ngx_http_push_stream_str_replace_locked(u_char *org, u_char *find, u_char *replace, ngx_pool_t *pool)
{
ngx_uint_t len_org = ngx_strlen(org);
ngx_uint_t len_find = ngx_strlen(find);
ngx_uint_t len_replace = ngx_strlen(replace);
u_char *result = org;
if (len_find > 0) {
u_char *ret = (u_char *) ngx_strstr(org, find);
if (ret != NULL) {
u_char *tmp = ngx_pcalloc(pool,len_org + len_replace + len_find);
u_int len_found = ret-org;
ngx_memcpy(tmp, org, len_found);
ngx_memcpy(tmp + len_found, replace, len_replace);
ngx_memcpy(tmp + len_found + len_replace, org + len_found + len_find, len_org - len_found - len_find);
result = ngx_http_push_stream_str_replace_locked(tmp, find, replace, pool);
}
}
return result;
}
static ngx_buf_t *
ngx_http_push_stream_get_formatted_message_locked(ngx_http_push_stream_loc_conf_t *pslcf, ngx_http_push_stream_channel_t *channel, ngx_buf_t *buf, ngx_pool_t *pool)
{
if (buf != NULL) {
// ensure the final string in a reusable buffer
*buf->last = '\0';
buf->temporary = 1;
buf->memory = 1;
}
if (pslcf->message_template.len > 0) {
u_char template_with_crlf[pslcf->message_template.len + NGX_HTTP_PUSH_STREAM_CRLF.len + 1];
ngx_memcpy(template_with_crlf, pslcf->message_template.data, pslcf->message_template.len);
ngx_memcpy(template_with_crlf + pslcf->message_template.len, NGX_HTTP_PUSH_STREAM_CRLF.data, NGX_HTTP_PUSH_STREAM_CRLF.len);
template_with_crlf[pslcf->message_template.len + NGX_HTTP_PUSH_STREAM_CRLF.len] = '\0';
u_char char_id[10];
u_char *msg = NGX_PUSH_STREAM_PING_MESSAGE_TEXT.data, *channel_id = NGX_PUSH_STREAM_PING_CHANNEL_ID.data;
if ((channel != NULL) && (buf != NULL)) {
ngx_memzero(char_id, sizeof(char_id));
ngx_sprintf(char_id, "%d", channel->last_message_id + 1);
msg = buf->pos;
channel_id = channel->id.data;
} else {
ngx_memcpy(char_id, NGX_PUSH_STREAM_PING_MESSAGE_ID.data, NGX_PUSH_STREAM_PING_MESSAGE_ID.len + 1);
}
u_char *txt = ngx_http_push_stream_str_replace_locked(template_with_crlf, NGX_PUSH_STREAM_TOKEN_MESSAGE_ID.data, char_id, pool);
txt = ngx_http_push_stream_str_replace_locked(txt, NGX_PUSH_STREAM_TOKEN_MESSAGE_CHANNEL.data, channel_id, pool);
txt = ngx_http_push_stream_str_replace_locked(txt, NGX_PUSH_STREAM_TOKEN_MESSAGE_TEXT.data, msg, pool);
ngx_buf_t *buf_msg = ngx_calloc_buf(pool);
buf_msg->pos = txt;
buf_msg->last = buf_msg->pos + ngx_strlen(txt) + 1;
buf_msg->start = buf_msg->pos;
buf_msg->end = buf_msg->last;
buf_msg->temporary = 1;
buf_msg->memory = 1;
return buf_msg;
} else if (buf != NULL) {
ngx_uint_t len_org = ngx_buf_size(buf);
ngx_uint_t len = len_org + NGX_HTTP_PUSH_STREAM_CRLF.len + 1;
u_char *txt_with_crlf = ngx_pcalloc(pool, len);
ngx_memcpy(txt_with_crlf, buf->pos, len_org);
ngx_memcpy(txt_with_crlf + len_org, NGX_HTTP_PUSH_STREAM_CRLF.data, NGX_HTTP_PUSH_STREAM_CRLF.len);
buf->pos = txt_with_crlf;
buf->last = buf->pos + len;
buf->start = buf->pos;
buf->end = buf->last;
*buf->last = '\0';
}
return buf;
}
static void
ngx_http_push_stream_worker_subscriber_cleanup_locked(ngx_http_push_stream_worker_subscriber_t *worker_subscriber)
{
ngx_http_push_stream_subscription_t *cur, *next, *sentinel;
sentinel = worker_subscriber->subscriptions_sentinel;
cur = (ngx_http_push_stream_subscription_t *) ngx_queue_head(&sentinel->queue);
while (cur != sentinel) {
next = (ngx_http_push_stream_subscription_t *) ngx_queue_next(&cur->queue);
cur->channel->subscribers--;
ngx_queue_remove(&cur->subscriber->queue);
ngx_queue_remove(&cur->queue);
cur = next;
}
ngx_queue_init(&sentinel->queue);
ngx_queue_remove(&worker_subscriber->queue);
ngx_queue_init(&worker_subscriber->queue);
worker_subscriber->clndata->worker_subscriber = NULL;
}
#include <ngx_http_push_stream_module.h>
static void ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare) (const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right));
static void ngx_http_push_stream_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
static int ngx_http_push_stream_compare_rbtree_node(const ngx_rbtree_node_t *v_left, const ngx_rbtree_node_t *v_right);
static ngx_int_t ngx_http_push_stream_delete_node_locked(ngx_rbtree_t *tree, ngx_rbtree_node_t *trash, ngx_slab_pool_t *shpool);
static ngx_http_push_stream_channel_t *
ngx_http_push_stream_clean_channel_locked(ngx_http_push_stream_channel_t *channel)
{
ngx_queue_t *sentinel = &channel->message_queue->queue;
time_t now = ngx_time();
ngx_http_push_stream_msg_t *msg = NULL;
while (!ngx_queue_empty(sentinel)) {
msg = ngx_queue_data(ngx_queue_head(sentinel), ngx_http_push_stream_msg_t, queue);
if (msg != NULL && msg->expires != 0 && now > msg->expires) {
ngx_http_push_stream_delete_message_locked(channel, msg, ngx_http_push_stream_shpool);
} else { // definitely a message left to send
return NULL;
}
}
// at this point, the queue is empty
return channel->subscribers == 0 ? channel : NULL; // if no waiting requests, return this channel to be deleted
}
static ngx_int_t
ngx_http_push_stream_delete_channel_locked(ngx_http_push_stream_channel_t *trash)
{
ngx_int_t res;
res = ngx_http_push_stream_delete_node_locked(&((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->tree, (ngx_rbtree_node_t *) trash, ngx_http_push_stream_shpool);
if (res == NGX_OK) {
((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->channels--;
return NGX_OK;
}
return res;
}
static ngx_int_t
ngx_http_push_stream_delete_node_locked(ngx_rbtree_t *tree, ngx_rbtree_node_t *trash, ngx_slab_pool_t *shpool)
{
// assume the shm zone is already locked
if (trash != NULL) { // take out the trash
ngx_rbtree_delete(tree, trash);
// delete the worker-subscriber queue
ngx_queue_t *sentinel = (ngx_queue_t *) (&((ngx_http_push_stream_channel_t *) trash)->workers_with_subscribers);
ngx_queue_t *cur = ngx_queue_head(sentinel);
ngx_queue_t *next;
while (cur != sentinel) {
next = ngx_queue_next(cur);
ngx_slab_free_locked(shpool, cur);
cur = next;
}
ngx_slab_free_locked(shpool, trash);
return NGX_OK;
}
return NGX_DECLINED;
}
static ngx_http_push_stream_channel_t *
ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log)
{
ngx_rbtree_t *tree = &((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->tree;
uint32_t hash;
ngx_rbtree_node_t *node, *sentinel;
ngx_int_t rc;
ngx_http_push_stream_channel_t *up = NULL;
ngx_http_push_stream_channel_t *trash[] = { NULL, NULL, NULL };
ngx_uint_t i, trashed = 0;
if (tree == NULL) {
return NULL;
}
hash = ngx_crc32_short(id->data, id->len);
node = tree->root;
sentinel = tree->sentinel;
while (node != sentinel) {
// every search is responsible for deleting a couple of empty, if it comes across them
if (trashed < (sizeof(trash) / sizeof(*trash))) {
if ((trash[trashed] = ngx_http_push_stream_clean_channel_locked((ngx_http_push_stream_channel_t *) node)) != NULL) {
trashed++;
}
}
if (hash < node->key) {
node = node->left;
continue;
}
if (hash > node->key) {
node = node->right;
continue;
}
/* hash == node->key */
do {
up = (ngx_http_push_stream_channel_t *) node;
rc = ngx_memn2cmp(id->data, up->id.data, id->len, up->id.len);
if (rc == 0) {
// found
for(i=0; i<trashed; i++) {
if (trash[i] != up) { // take out the trash
ngx_http_push_stream_delete_channel_locked(trash[i]);
}
}
ngx_http_push_stream_clean_channel_locked(up);
return up;
}
node = (rc < 0) ? node->left : node->right;
} while (node != sentinel && hash == node->key);
break;
}
// not found
for(i=0; i<trashed; i++) {
ngx_http_push_stream_delete_channel_locked(trash[i]);
}
return NULL;
}
// find a channel by id. if channel not found, make one, insert it, and return that.
static ngx_http_push_stream_channel_t *
ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log)
{
ngx_rbtree_t *tree;
ngx_http_push_stream_channel_t *up = ngx_http_push_stream_find_channel(id, log);
if (up != NULL) { // we found our channel
return up;
}
tree = &((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->tree;
if ((up = ngx_http_push_stream_slab_alloc_locked(sizeof(*up) + id->len + 1 + sizeof(ngx_http_push_stream_msg_t))) == NULL) {
return NULL;
}
up->id.data = (u_char *) (up+1); // contiguous piggy
up->message_queue = (ngx_http_push_stream_msg_t *) (up->id.data + id->len + 1);
up->id.len = (u_char) id->len;
ngx_memzero(up->id.data, up->id.len + 1);
ngx_memcpy(up->id.data, id->data, up->id.len);
up->node.key = ngx_crc32_short(id->data, id->len);
ngx_rbtree_insert(tree, (ngx_rbtree_node_t *) up);
// initialize queues
ngx_queue_init(&up->message_queue->queue);
up->last_message_id = 0;
up->stored_messages = 0;
ngx_queue_init(&up->workers_with_subscribers.queue);
up->subscribers = 0;
((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->channels++;
return up;
}
static void
ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare) (const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right))
{
for (;;) {
if (node->key < temp->key) {
if (temp->left == sentinel) {
temp->left = node;
break;
}
temp = temp->left;
} else if (node->key > temp->key) {
if (temp->right == sentinel) {
temp->right = node;
break;
}
temp = temp->right;
} else { /* node->key == temp->key */
if (compare(node, temp) < 0) {
if (temp->left == sentinel) {
temp->left = node;
break;
}
temp = temp->left;
} else {
if (temp->right == sentinel) {
temp->right = node;
break;
}
temp = temp->right;
}
}
}
node->parent = temp;
node->left = sentinel;
node->right = sentinel;
ngx_rbt_red(node);
}
#define ngx_http_push_stream_walk_rbtree(apply) \
ngx_http_push_stream_rbtree_walker(&((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->tree, (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr, apply, ((ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data)->tree.root)
static void
ngx_http_push_stream_rbtree_walker(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_int_t (*apply) (ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool), ngx_rbtree_node_t *node)
{
ngx_rbtree_node_t *sentinel = tree->sentinel;
if (node != sentinel) {
apply((ngx_http_push_stream_channel_t *) node, shpool);
if (node->left != NULL) {
ngx_http_push_stream_rbtree_walker(tree, shpool, apply, node->left);
}
if (node->right != NULL) {
ngx_http_push_stream_rbtree_walker(tree, shpool, apply, node->right);
}
}
}
static void
ngx_http_push_stream_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
{
ngx_rbtree_generic_insert(temp, node, sentinel, ngx_http_push_stream_compare_rbtree_node);
}
static int
ngx_http_push_stream_compare_rbtree_node(const ngx_rbtree_node_t *v_left, const ngx_rbtree_node_t *v_right)
{
ngx_http_push_stream_channel_t *left = (ngx_http_push_stream_channel_t *) v_left, *right = (ngx_http_push_stream_channel_t *) v_right;
return ngx_memn2cmp(left->id.data, right->id.data, left->id.len, right->id.len);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment