Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nginx-push-stream-module
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Administrator
nginx-push-stream-module
Commits
f79baac0
Commit
f79baac0
authored
Oct 23, 2011
by
Wandenberg Peixoto
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
split validate channel code in function ngx_http_push_stream_validate_channels to be reusable
parent
57c6ff7d
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
86 additions
and
63 deletions
+86
-63
ngx_http_push_stream_module_subscriber.h
include/ngx_http_push_stream_module_subscriber.h
+2
-1
ngx_http_push_stream_module_subscriber.c
src/ngx_http_push_stream_module_subscriber.c
+84
-62
No files found.
include/ngx_http_push_stream_module_subscriber.h
View file @
f79baac0
...
@@ -33,7 +33,8 @@ typedef struct {
...
@@ -33,7 +33,8 @@ typedef struct {
}
ngx_http_push_stream_requested_channel_t
;
}
ngx_http_push_stream_requested_channel_t
;
static
ngx_int_t
ngx_http_push_stream_subscriber_handler
(
ngx_http_request_t
*
r
);
static
ngx_int_t
ngx_http_push_stream_subscriber_handler
(
ngx_http_request_t
*
r
);
ngx_http_push_stream_requested_channel_t
*
ngx_http_push_stream_parse_channels_ids_from_path
(
ngx_http_request_t
*
r
,
ngx_pool_t
*
pool
);
ngx_http_push_stream_requested_channel_t
*
ngx_http_push_stream_parse_channels_ids_from_path
(
ngx_http_request_t
*
r
,
ngx_pool_t
*
pool
);
static
ngx_int_t
ngx_http_push_stream_validate_channels
(
ngx_http_request_t
*
r
,
ngx_http_push_stream_requested_channel_t
*
channels_ids
,
ngx_int_t
*
status_code
,
ngx_str_t
**
explain_error_message
);
static
void
ngx_http_push_stream_subscriber_cleanup
(
ngx_http_push_stream_subscriber_cleanup_t
*
data
);
static
void
ngx_http_push_stream_subscriber_cleanup
(
ngx_http_push_stream_subscriber_cleanup_t
*
data
);
...
...
src/ngx_http_push_stream_module_subscriber.c
View file @
f79baac0
...
@@ -43,16 +43,13 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
...
@@ -43,16 +43,13 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_http_push_stream_subscriber_t
*
worker_subscriber
;
ngx_http_push_stream_subscriber_t
*
worker_subscriber
;
ngx_http_push_stream_requested_channel_t
*
channels_ids
,
*
cur
;
ngx_http_push_stream_requested_channel_t
*
channels_ids
,
*
cur
;
ngx_pool_t
*
temp_pool
;
ngx_pool_t
*
temp_pool
;
ngx_uint_t
subscribed_channels_qtd
=
0
;
ngx_uint_t
subscribed_broadcast_channels_qtd
=
0
;
ngx_flag_t
is_broadcast_channel
;
ngx_http_push_stream_channel_t
*
channel
;
time_t
if_modified_since
;
time_t
if_modified_since
;
ngx_str_t
*
last_event_id
;
ngx_str_t
*
last_event_id
;
ngx_str_t
*
push_mode
;
ngx_str_t
*
push_mode
;
ngx_flag_t
polling
,
longpolling
;
ngx_flag_t
polling
,
longpolling
;
ngx_http_push_stream_main_conf_t
*
mcf
=
ngx_http_push_stream_module_main_conf
;
ngx_int_t
rc
;
ngx_int_t
rc
;
ngx_int_t
status_code
;
ngx_str_t
*
explain_error_message
;
// only accept GET method
// only accept GET method
if
(
!
(
r
->
method
&
NGX_HTTP_GET
))
{
if
(
!
(
r
->
method
&
NGX_HTTP_GET
))
{
...
@@ -75,64 +72,9 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
...
@@ -75,64 +72,9 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
}
}
//validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers
//validate channels: name, length and quantity. check if channel exists when authorized_channels_only is on. check if channel is full of subscribers
cur
=
channels_ids
;
if
(
ngx_http_push_stream_validate_channels
(
r
,
channels_ids
,
&
status_code
,
&
explain_error_message
)
==
NGX_ERROR
)
{
while
((
cur
=
(
ngx_http_push_stream_requested_channel_t
*
)
ngx_queue_next
(
&
cur
->
queue
))
!=
channels_ids
)
{
// could not be ALL channel or contain wildcard
if
((
ngx_memn2cmp
(
cur
->
id
->
data
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
data
,
cur
->
id
->
len
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
len
)
==
0
)
||
(
ngx_strchr
(
cur
->
id
->
data
,
'*'
)
!=
NULL
))
{
ngx_destroy_pool
(
temp_pool
);
return
ngx_http_push_stream_send_only_header_response
(
r
,
NGX_HTTP_FORBIDDEN
,
&
NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE
);
}
// could not have a large size
if
((
mcf
->
max_channel_id_length
!=
NGX_CONF_UNSET_UINT
)
&&
(
cur
->
id
->
len
>
mcf
->
max_channel_id_length
))
{
ngx_log_error
(
NGX_LOG_WARN
,
r
->
connection
->
log
,
0
,
"push stream module: channel id is larger than allowed %d"
,
cur
->
id
->
len
);
ngx_destroy_pool
(
temp_pool
);
return
ngx_http_push_stream_send_only_header_response
(
r
,
NGX_HTTP_BAD_REQUEST
,
&
NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE
);
}
// count subscribed channel and broadcasts
subscribed_channels_qtd
++
;
is_broadcast_channel
=
0
;
if
((
mcf
->
broadcast_channel_prefix
.
len
>
0
)
&&
(
ngx_strncmp
(
cur
->
id
->
data
,
mcf
->
broadcast_channel_prefix
.
data
,
mcf
->
broadcast_channel_prefix
.
len
)
==
0
))
{
is_broadcast_channel
=
1
;
subscribed_broadcast_channels_qtd
++
;
}
// check if channel exists when authorized_channels_only is on
if
(
cf
->
authorized_channels_only
&&
!
is_broadcast_channel
&&
(((
channel
=
ngx_http_push_stream_find_channel
(
cur
->
id
,
r
->
connection
->
log
))
==
NULL
)
||
(
channel
->
stored_messages
==
0
)))
{
ngx_destroy_pool
(
temp_pool
);
return
ngx_http_push_stream_send_only_header_response
(
r
,
NGX_HTTP_FORBIDDEN
,
&
NGX_HTTP_PUSH_STREAM_CANNOT_CREATE_CHANNELS
);
}
// check if channel is full of subscribers
if
((
mcf
->
max_subscribers_per_channel
!=
NGX_CONF_UNSET_UINT
)
&&
(((
channel
=
ngx_http_push_stream_find_channel
(
cur
->
id
,
r
->
connection
->
log
))
!=
NULL
)
&&
(
channel
->
subscribers
>=
mcf
->
max_subscribers_per_channel
)))
{
ngx_destroy_pool
(
temp_pool
);
return
ngx_http_push_stream_send_only_header_response
(
r
,
NGX_HTTP_FORBIDDEN
,
&
NGX_HTTP_PUSH_STREAM_TOO_SUBSCRIBERS_PER_CHANNEL
);
}
}
// check if number of subscribed broadcast channels is acceptable
if
((
cf
->
broadcast_channel_max_qtd
!=
NGX_CONF_UNSET_UINT
)
&&
(
subscribed_broadcast_channels_qtd
>
0
)
&&
((
subscribed_broadcast_channels_qtd
>
cf
->
broadcast_channel_max_qtd
)
||
(
subscribed_broadcast_channels_qtd
==
subscribed_channels_qtd
)))
{
ngx_log_error
(
NGX_LOG_ERR
,
r
->
connection
->
log
,
0
,
"push stream module: max subscribed broadcast channels exceeded"
);
ngx_destroy_pool
(
temp_pool
);
ngx_destroy_pool
(
temp_pool
);
return
ngx_http_push_stream_send_only_header_response
(
r
,
NGX_HTTP_FORBIDDEN
,
&
NGX_HTTP_PUSH_STREAM_TOO_MUCH_BROADCAST_CHANNELS
);
return
ngx_http_push_stream_send_only_header_response
(
r
,
status_code
,
explain_error_message
);
}
// create the channels in advance, if doesn't exist, to ensure max number of channels in the server
cur
=
channels_ids
;
while
((
cur
=
(
ngx_http_push_stream_requested_channel_t
*
)
ngx_queue_next
(
&
cur
->
queue
))
!=
channels_ids
)
{
channel
=
ngx_http_push_stream_get_channel
(
cur
->
id
,
r
->
connection
->
log
,
cf
);
if
(
channel
==
NULL
)
{
ngx_log_error
(
NGX_LOG_ERR
,
(
r
)
->
connection
->
log
,
0
,
"push stream module: unable to allocate memory for new channel"
);
ngx_destroy_pool
(
temp_pool
);
return
ngx_http_push_stream_send_only_header_response
(
r
,
NGX_HTTP_INTERNAL_SERVER_ERROR
,
NULL
);
}
if
(
channel
==
NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED
)
{
ngx_log_error
(
NGX_LOG_ERR
,
(
r
)
->
connection
->
log
,
0
,
"push stream module: number of channels were exceeded"
);
ngx_destroy_pool
(
temp_pool
);
return
ngx_http_push_stream_send_only_header_response
(
r
,
NGX_HTTP_FORBIDDEN
,
&
NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE
);
}
}
}
// get control headers
// get control headers
...
@@ -423,6 +365,86 @@ ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_poo
...
@@ -423,6 +365,86 @@ ngx_http_push_stream_parse_channels_ids_from_path(ngx_http_request_t *r, ngx_poo
return
channels_ids
;
return
channels_ids
;
}
}
static
ngx_int_t
ngx_http_push_stream_validate_channels
(
ngx_http_request_t
*
r
,
ngx_http_push_stream_requested_channel_t
*
channels_ids
,
ngx_int_t
*
status_code
,
ngx_str_t
**
explain_error_message
)
{
ngx_http_push_stream_main_conf_t
*
mcf
=
ngx_http_push_stream_module_main_conf
;
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_requested_channel_t
*
cur
=
channels_ids
;
ngx_uint_t
subscribed_channels_qtd
=
0
;
ngx_uint_t
subscribed_broadcast_channels_qtd
=
0
;
ngx_flag_t
is_broadcast_channel
;
ngx_http_push_stream_channel_t
*
channel
;
while
((
cur
=
(
ngx_http_push_stream_requested_channel_t
*
)
ngx_queue_next
(
&
cur
->
queue
))
!=
channels_ids
)
{
// could not be ALL channel or contain wildcard
if
((
ngx_memn2cmp
(
cur
->
id
->
data
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
data
,
cur
->
id
->
len
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
len
)
==
0
)
||
(
ngx_strchr
(
cur
->
id
->
data
,
'*'
)
!=
NULL
))
{
*
status_code
=
NGX_HTTP_FORBIDDEN
;
*
explain_error_message
=
(
ngx_str_t
*
)
&
NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE
;
return
NGX_ERROR
;
}
// could not have a large size
if
((
mcf
->
max_channel_id_length
!=
NGX_CONF_UNSET_UINT
)
&&
(
cur
->
id
->
len
>
mcf
->
max_channel_id_length
))
{
ngx_log_error
(
NGX_LOG_WARN
,
r
->
connection
->
log
,
0
,
"push stream module: channel id is larger than allowed %d"
,
cur
->
id
->
len
);
*
status_code
=
NGX_HTTP_BAD_REQUEST
;
*
explain_error_message
=
(
ngx_str_t
*
)
&
NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE
;
return
NGX_ERROR
;
}
// count subscribed channel and broadcasts
subscribed_channels_qtd
++
;
is_broadcast_channel
=
0
;
if
((
mcf
->
broadcast_channel_prefix
.
len
>
0
)
&&
(
ngx_strncmp
(
cur
->
id
->
data
,
mcf
->
broadcast_channel_prefix
.
data
,
mcf
->
broadcast_channel_prefix
.
len
)
==
0
))
{
is_broadcast_channel
=
1
;
subscribed_broadcast_channels_qtd
++
;
}
// check if channel exists when authorized_channels_only is on
if
(
cf
->
authorized_channels_only
&&
!
is_broadcast_channel
&&
(((
channel
=
ngx_http_push_stream_find_channel
(
cur
->
id
,
r
->
connection
->
log
))
==
NULL
)
||
(
channel
->
stored_messages
==
0
)))
{
*
status_code
=
NGX_HTTP_FORBIDDEN
;
*
explain_error_message
=
(
ngx_str_t
*
)
&
NGX_HTTP_PUSH_STREAM_CANNOT_CREATE_CHANNELS
;
return
NGX_ERROR
;
}
// check if channel is full of subscribers
if
((
mcf
->
max_subscribers_per_channel
!=
NGX_CONF_UNSET_UINT
)
&&
(((
channel
=
ngx_http_push_stream_find_channel
(
cur
->
id
,
r
->
connection
->
log
))
!=
NULL
)
&&
(
channel
->
subscribers
>=
mcf
->
max_subscribers_per_channel
)))
{
*
status_code
=
NGX_HTTP_FORBIDDEN
;
*
explain_error_message
=
(
ngx_str_t
*
)
&
NGX_HTTP_PUSH_STREAM_TOO_SUBSCRIBERS_PER_CHANNEL
;
return
NGX_ERROR
;
}
}
// check if number of subscribed broadcast channels is acceptable
if
((
cf
->
broadcast_channel_max_qtd
!=
NGX_CONF_UNSET_UINT
)
&&
(
subscribed_broadcast_channels_qtd
>
0
)
&&
((
subscribed_broadcast_channels_qtd
>
cf
->
broadcast_channel_max_qtd
)
||
(
subscribed_broadcast_channels_qtd
==
subscribed_channels_qtd
)))
{
ngx_log_error
(
NGX_LOG_ERR
,
r
->
connection
->
log
,
0
,
"push stream module: max subscribed broadcast channels exceeded"
);
*
status_code
=
NGX_HTTP_FORBIDDEN
;
*
explain_error_message
=
(
ngx_str_t
*
)
&
NGX_HTTP_PUSH_STREAM_TOO_MUCH_BROADCAST_CHANNELS
;
return
NGX_ERROR
;
}
// create the channels in advance, if doesn't exist, to ensure max number of channels in the server
cur
=
channels_ids
;
while
((
cur
=
(
ngx_http_push_stream_requested_channel_t
*
)
ngx_queue_next
(
&
cur
->
queue
))
!=
channels_ids
)
{
channel
=
ngx_http_push_stream_get_channel
(
cur
->
id
,
r
->
connection
->
log
,
cf
);
if
(
channel
==
NULL
)
{
ngx_log_error
(
NGX_LOG_ERR
,
(
r
)
->
connection
->
log
,
0
,
"push stream module: unable to allocate memory for new channel"
);
*
status_code
=
NGX_HTTP_INTERNAL_SERVER_ERROR
;
*
explain_error_message
=
NULL
;
return
NGX_ERROR
;
}
if
(
channel
==
NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED
)
{
ngx_log_error
(
NGX_LOG_ERR
,
(
r
)
->
connection
->
log
,
0
,
"push stream module: number of channels were exceeded"
);
*
status_code
=
NGX_HTTP_FORBIDDEN
;
*
explain_error_message
=
(
ngx_str_t
*
)
&
NGX_HTTP_PUSH_STREAM_NUMBER_OF_CHANNELS_EXCEEDED_MESSAGE
;
return
NGX_ERROR
;
}
}
return
NGX_OK
;
}
static
void
static
void
ngx_http_push_stream_subscriber_cleanup
(
ngx_http_push_stream_subscriber_cleanup_t
*
data
)
ngx_http_push_stream_subscriber_cleanup
(
ngx_http_push_stream_subscriber_cleanup_t
*
data
)
{
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment