Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nginx-push-stream-module
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Administrator
nginx-push-stream-module
Commits
ae39453b
Commit
ae39453b
authored
Sep 08, 2013
by
Wandenberg
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
rename module context struct
parent
32d59def
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
24 additions
and
24 deletions
+24
-24
ngx_http_push_stream_module.h
include/ngx_http_push_stream_module.h
+2
-2
ngx_http_push_stream_module_utils.h
include/ngx_http_push_stream_module_utils.h
+1
-1
ngx_http_push_stream_module_ipc.c
src/ngx_http_push_stream_module_ipc.c
+1
-1
ngx_http_push_stream_module_publisher.c
src/ngx_http_push_stream_module_publisher.c
+1
-1
ngx_http_push_stream_module_subscriber.c
src/ngx_http_push_stream_module_subscriber.c
+5
-5
ngx_http_push_stream_module_utils.c
src/ngx_http_push_stream_module_utils.c
+12
-12
ngx_http_push_stream_module_websocket.c
src/ngx_http_push_stream_module_websocket.c
+2
-2
No files found.
include/ngx_http_push_stream_module.h
View file @
ae39453b
...
@@ -179,7 +179,7 @@ typedef struct {
...
@@ -179,7 +179,7 @@ typedef struct {
ngx_chain_t
*
busy
;
ngx_chain_t
*
busy
;
ngx_http_push_stream_padding_t
*
padding
;
ngx_http_push_stream_padding_t
*
padding
;
ngx_str_t
*
callback
;
ngx_str_t
*
callback
;
}
ngx_http_push_stream_
subscriber
_ctx_t
;
}
ngx_http_push_stream_
module
_ctx_t
;
// messages to worker processes
// messages to worker processes
typedef
struct
{
typedef
struct
{
...
@@ -236,7 +236,7 @@ static ngx_int_t ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf
...
@@ -236,7 +236,7 @@ static ngx_int_t ngx_http_push_stream_find_or_add_template(ngx_conf_t *cf
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
=
ngx_string
(
"ALL"
);
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
=
ngx_string
(
"ALL"
);
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE
=
ngx_string
(
"No channel id provided."
);
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_NO_CHANNEL_ID_MESSAGE
=
ngx_string
(
"No channel id provided."
);
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_
NO_
CHANNEL_ID_NOT_AUTHORIZED_MESSAGE
=
ngx_string
(
"Channel id not authorized for this method."
);
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE
=
ngx_string
(
"Channel id not authorized for this method."
);
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_EMPTY_POST_REQUEST_MESSAGE
=
ngx_string
(
"Empty post requests are not allowed."
);
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_EMPTY_POST_REQUEST_MESSAGE
=
ngx_string
(
"Empty post requests are not allowed."
);
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE
=
ngx_string
(
"Channel id is too large."
);
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_TOO_LARGE_CHANNEL_ID_MESSAGE
=
ngx_string
(
"Channel id is too large."
);
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_TOO_MUCH_WILDCARD_CHANNELS
=
ngx_string
(
"Subscribed too much wildcard channels."
);
static
const
ngx_str_t
NGX_HTTP_PUSH_STREAM_TOO_MUCH_WILDCARD_CHANNELS
=
ngx_string
(
"Subscribed too much wildcard channels."
);
...
...
include/ngx_http_push_stream_module_utils.h
View file @
ae39453b
...
@@ -283,7 +283,7 @@ static ngx_http_push_stream_content_subtype_t * ngx_http_push_stream_match_c
...
@@ -283,7 +283,7 @@ static ngx_http_push_stream_content_subtype_t * ngx_http_push_stream_match_c
static
ngx_http_push_stream_line_t
*
ngx_http_push_stream_split_by_crlf
(
ngx_str_t
*
msg
,
ngx_pool_t
*
temp_pool
);
static
ngx_http_push_stream_line_t
*
ngx_http_push_stream_split_by_crlf
(
ngx_str_t
*
msg
,
ngx_pool_t
*
temp_pool
);
static
ngx_str_t
*
ngx_http_push_stream_join_with_crlf
(
ngx_http_push_stream_line_t
*
lines
,
ngx_pool_t
*
temp_pool
);
static
ngx_str_t
*
ngx_http_push_stream_join_with_crlf
(
ngx_http_push_stream_line_t
*
lines
,
ngx_pool_t
*
temp_pool
);
static
ngx_http_push_stream_
subscriber_ctx_t
*
ngx_http_push_stream_add_request_context
(
ngx_http_request_t
*
r
);
static
ngx_http_push_stream_
module_ctx_t
*
ngx_http_push_stream_add_request_context
(
ngx_http_request_t
*
r
);
static
ngx_http_push_stream_padding_t
*
ngx_http_push_stream_parse_paddings
(
ngx_conf_t
*
cf
,
ngx_str_t
*
paddings_by_user_agent
);
static
ngx_http_push_stream_padding_t
*
ngx_http_push_stream_parse_paddings
(
ngx_conf_t
*
cf
,
ngx_str_t
*
paddings_by_user_agent
);
...
...
src/ngx_http_push_stream_module_ipc.c
View file @
ae39453b
...
@@ -435,7 +435,7 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan
...
@@ -435,7 +435,7 @@ ngx_http_push_stream_respond_to_subscribers(ngx_http_push_stream_channel_t *chan
ngx_http_push_stream_send_response_finalize
(
subscriber
->
request
);
ngx_http_push_stream_send_response_finalize
(
subscriber
->
request
);
cur
=
prev
;
cur
=
prev
;
}
else
{
}
else
{
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
subscriber
->
request
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
subscriber
->
request
,
ngx_http_push_stream_module
);
ngx_http_push_stream_loc_conf_t
*
pslcf
=
ngx_http_get_module_loc_conf
(
subscriber
->
request
,
ngx_http_push_stream_module
);
ngx_http_push_stream_loc_conf_t
*
pslcf
=
ngx_http_get_module_loc_conf
(
subscriber
->
request
,
ngx_http_push_stream_module
);
ngx_http_push_stream_timer_reset
(
pslcf
->
ping_message_interval
,
ctx
->
ping_timer
);
ngx_http_push_stream_timer_reset
(
pslcf
->
ping_message_interval
,
ctx
->
ping_timer
);
}
}
...
...
src/ngx_http_push_stream_module_publisher.c
View file @
ae39453b
...
@@ -75,7 +75,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
...
@@ -75,7 +75,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r)
if
(
r
->
method
&
(
NGX_HTTP_POST
|
NGX_HTTP_PUT
))
{
if
(
r
->
method
&
(
NGX_HTTP_POST
|
NGX_HTTP_PUT
))
{
// check if channel id isn't equals to ALL or contain wildcard
// check if channel id isn't equals to ALL or contain wildcard
if
((
ngx_memn2cmp
(
id
->
data
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
data
,
id
->
len
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
len
)
==
0
)
||
(
ngx_strchr
(
id
->
data
,
'*'
)
!=
NULL
))
{
if
((
ngx_memn2cmp
(
id
->
data
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
data
,
id
->
len
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
len
)
==
0
)
||
(
ngx_strchr
(
id
->
data
,
'*'
)
!=
NULL
))
{
return
ngx_http_push_stream_send_only_header_response
(
r
,
NGX_HTTP_FORBIDDEN
,
&
NGX_HTTP_PUSH_STREAM_
NO_
CHANNEL_ID_NOT_AUTHORIZED_MESSAGE
);
return
ngx_http_push_stream_send_only_header_response
(
r
,
NGX_HTTP_FORBIDDEN
,
&
NGX_HTTP_PUSH_STREAM_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE
);
}
}
// create the channel if doesn't exist
// create the channel if doesn't exist
...
...
src/ngx_http_push_stream_module_subscriber.c
View file @
ae39453b
...
@@ -43,7 +43,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
...
@@ -43,7 +43,7 @@ ngx_http_push_stream_subscriber_handler(ngx_http_request_t *r)
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_subscriber_t
*
worker_subscriber
;
ngx_http_push_stream_subscriber_t
*
worker_subscriber
;
ngx_http_push_stream_requested_channel_t
*
channels_ids
,
*
cur
;
ngx_http_push_stream_requested_channel_t
*
channels_ids
,
*
cur
;
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
;
ngx_http_push_stream_
module_ctx_t
*
ctx
;
ngx_int_t
tag
;
ngx_int_t
tag
;
time_t
if_modified_since
;
time_t
if_modified_since
;
ngx_str_t
*
last_event_id
=
NULL
;
ngx_str_t
*
last_event_id
=
NULL
;
...
@@ -148,7 +148,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
...
@@ -148,7 +148,7 @@ ngx_http_push_stream_subscriber_polling_handler(ngx_http_request_t *r, ngx_http_
{
{
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_slab_pool_t
*
shpool
=
(
ngx_slab_pool_t
*
)
ngx_http_push_stream_shm_zone
->
shm
.
addr
;
ngx_slab_pool_t
*
shpool
=
(
ngx_slab_pool_t
*
)
ngx_http_push_stream_shm_zone
->
shm
.
addr
;
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_requested_channel_t
*
cur
;
ngx_http_push_stream_requested_channel_t
*
cur
;
ngx_http_push_stream_subscriber_t
*
worker_subscriber
;
ngx_http_push_stream_subscriber_t
*
worker_subscriber
;
ngx_http_push_stream_channel_t
*
channel
;
ngx_http_push_stream_channel_t
*
channel
;
...
@@ -326,7 +326,7 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
...
@@ -326,7 +326,7 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre
// could not be ALL channel or contain wildcard
// could not be ALL channel or contain wildcard
if
((
ngx_memn2cmp
(
cur
->
id
->
data
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
data
,
cur
->
id
->
len
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
len
)
==
0
)
||
(
ngx_strchr
(
cur
->
id
->
data
,
'*'
)
!=
NULL
))
{
if
((
ngx_memn2cmp
(
cur
->
id
->
data
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
data
,
cur
->
id
->
len
,
NGX_HTTP_PUSH_STREAM_ALL_CHANNELS_INFO_ID
.
len
)
==
0
)
||
(
ngx_strchr
(
cur
->
id
->
data
,
'*'
)
!=
NULL
))
{
*
status_code
=
NGX_HTTP_FORBIDDEN
;
*
status_code
=
NGX_HTTP_FORBIDDEN
;
*
explain_error_message
=
(
ngx_str_t
*
)
&
NGX_HTTP_PUSH_STREAM_
NO_
CHANNEL_ID_NOT_AUTHORIZED_MESSAGE
;
*
explain_error_message
=
(
ngx_str_t
*
)
&
NGX_HTTP_PUSH_STREAM_CHANNEL_ID_NOT_AUTHORIZED_MESSAGE
;
return
NGX_ERROR
;
return
NGX_ERROR
;
}
}
...
@@ -396,7 +396,7 @@ static ngx_http_push_stream_subscriber_t *
...
@@ -396,7 +396,7 @@ static ngx_http_push_stream_subscriber_t *
ngx_http_push_stream_subscriber_prepare_request_to_keep_connected
(
ngx_http_request_t
*
r
)
ngx_http_push_stream_subscriber_prepare_request_to_keep_connected
(
ngx_http_request_t
*
r
)
{
{
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_subscriber_t
*
worker_subscriber
;
ngx_http_push_stream_subscriber_t
*
worker_subscriber
;
if
((
worker_subscriber
=
ngx_pcalloc
(
r
->
pool
,
sizeof
(
ngx_http_push_stream_subscriber_t
)))
==
NULL
)
{
if
((
worker_subscriber
=
ngx_pcalloc
(
r
->
pool
,
sizeof
(
ngx_http_push_stream_subscriber_t
)))
==
NULL
)
{
...
@@ -437,7 +437,7 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
...
@@ -437,7 +437,7 @@ ngx_http_push_stream_registry_subscriber_locked(ngx_http_request_t *r, ngx_http_
ngx_http_push_stream_worker_data_t
*
thisworker_data
=
data
->
ipc
+
ngx_process_slot
;
ngx_http_push_stream_worker_data_t
*
thisworker_data
=
data
->
ipc
+
ngx_process_slot
;
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_msec_t
connection_ttl
=
worker_subscriber
->
longpolling
?
cf
->
longpolling_connection_ttl
:
cf
->
subscriber_connection_ttl
;
ngx_msec_t
connection_ttl
=
worker_subscriber
->
longpolling
?
cf
->
longpolling_connection_ttl
:
cf
->
subscriber_connection_ttl
;
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
// adding subscriber to worker list of subscribers
// adding subscriber to worker list of subscribers
ngx_queue_insert_tail
(
&
thisworker_data
->
subscribers_queue
,
&
worker_subscriber
->
worker_queue
);
ngx_queue_insert_tail
(
&
thisworker_data
->
subscribers_queue
,
&
worker_subscriber
->
worker_queue
);
...
...
src/ngx_http_push_stream_module_utils.c
View file @
ae39453b
...
@@ -458,7 +458,7 @@ static ngx_int_t
...
@@ -458,7 +458,7 @@ static ngx_int_t
ngx_http_push_stream_send_response_message
(
ngx_http_request_t
*
r
,
ngx_http_push_stream_channel_t
*
channel
,
ngx_http_push_stream_msg_t
*
msg
,
ngx_flag_t
send_callback
,
ngx_flag_t
send_separator
)
ngx_http_push_stream_send_response_message
(
ngx_http_request_t
*
r
,
ngx_http_push_stream_channel_t
*
channel
,
ngx_http_push_stream_msg_t
*
msg
,
ngx_flag_t
send_callback
,
ngx_flag_t
send_separator
)
{
{
ngx_http_push_stream_loc_conf_t
*
pslcf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_loc_conf_t
*
pslcf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_flag_t
use_jsonp
=
(
ctx
!=
NULL
)
&&
(
ctx
->
callback
!=
NULL
);
ngx_flag_t
use_jsonp
=
(
ctx
!=
NULL
)
&&
(
ctx
->
callback
!=
NULL
);
ngx_int_t
rc
=
NGX_OK
;
ngx_int_t
rc
=
NGX_OK
;
...
@@ -509,7 +509,7 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
...
@@ -509,7 +509,7 @@ ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_
ngx_chain_t
*
ngx_chain_t
*
ngx_http_push_stream_get_buf
(
ngx_http_request_t
*
r
)
ngx_http_push_stream_get_buf
(
ngx_http_request_t
*
r
)
{
{
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
=
NULL
;
ngx_http_push_stream_
module_ctx_t
*
ctx
=
NULL
;
ngx_chain_t
*
out
=
NULL
;
ngx_chain_t
*
out
=
NULL
;
if
((
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
))
!=
NULL
)
{
if
((
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
))
!=
NULL
)
{
...
@@ -536,7 +536,7 @@ ngx_http_push_stream_get_buf(ngx_http_request_t *r)
...
@@ -536,7 +536,7 @@ ngx_http_push_stream_get_buf(ngx_http_request_t *r)
ngx_int_t
ngx_int_t
ngx_http_push_stream_output_filter
(
ngx_http_request_t
*
r
,
ngx_chain_t
*
in
)
ngx_http_push_stream_output_filter
(
ngx_http_request_t
*
r
,
ngx_chain_t
*
in
)
{
{
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
=
NULL
;
ngx_http_push_stream_
module_ctx_t
*
ctx
=
NULL
;
ngx_int_t
rc
;
ngx_int_t
rc
;
rc
=
ngx_http_output_filter
(
r
,
in
);
rc
=
ngx_http_output_filter
(
r
,
in
);
...
@@ -608,7 +608,7 @@ ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *tex
...
@@ -608,7 +608,7 @@ ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *tex
static
ngx_int_t
static
ngx_int_t
ngx_http_push_stream_send_response_padding
(
ngx_http_request_t
*
r
,
size_t
len
,
ngx_flag_t
sending_header
)
ngx_http_push_stream_send_response_padding
(
ngx_http_request_t
*
r
,
size_t
len
,
ngx_flag_t
sending_header
)
{
{
ngx_http_push_stream_
subscriber
_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module
_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
if
(
ctx
->
padding
!=
NULL
)
{
if
(
ctx
->
padding
!=
NULL
)
{
ngx_int_t
diff
=
((
sending_header
)
?
ctx
->
padding
->
header_min_len
:
ctx
->
padding
->
message_min_len
)
-
len
;
ngx_int_t
diff
=
((
sending_header
)
?
ctx
->
padding
->
header_min_len
:
ctx
->
padding
->
message_min_len
)
-
len
;
...
@@ -976,7 +976,7 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
...
@@ -976,7 +976,7 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev)
if
(
rc
!=
NGX_OK
)
{
if
(
rc
!=
NGX_OK
)
{
ngx_http_push_stream_send_response_finalize
(
r
);
ngx_http_push_stream_send_response_finalize
(
r
);
}
else
{
}
else
{
ngx_http_push_stream_
subscriber
_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module
_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_timer_reset
(
pslcf
->
ping_message_interval
,
ctx
->
ping_timer
);
ngx_http_push_stream_timer_reset
(
pslcf
->
ping_message_interval
,
ctx
->
ping_timer
);
}
}
}
}
...
@@ -985,7 +985,7 @@ static void
...
@@ -985,7 +985,7 @@ static void
ngx_http_push_stream_disconnect_timer_wake_handler
(
ngx_event_t
*
ev
)
ngx_http_push_stream_disconnect_timer_wake_handler
(
ngx_event_t
*
ev
)
{
{
ngx_http_request_t
*
r
=
(
ngx_http_request_t
*
)
ev
->
data
;
ngx_http_request_t
*
r
=
(
ngx_http_request_t
*
)
ev
->
data
;
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
if
(
ctx
->
longpolling
)
{
if
(
ctx
->
longpolling
)
{
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout
(
r
);
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout
(
r
);
...
@@ -1102,17 +1102,17 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
...
@@ -1102,17 +1102,17 @@ ngx_http_push_stream_format_message(ngx_http_push_stream_channel_t *channel, ngx
}
}
static
ngx_http_push_stream_
subscriber
_ctx_t
*
static
ngx_http_push_stream_
module
_ctx_t
*
ngx_http_push_stream_add_request_context
(
ngx_http_request_t
*
r
)
ngx_http_push_stream_add_request_context
(
ngx_http_request_t
*
r
)
{
{
ngx_pool_cleanup_t
*
cln
;
ngx_pool_cleanup_t
*
cln
;
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
if
(
ctx
!=
NULL
)
{
if
(
ctx
!=
NULL
)
{
return
ctx
;
return
ctx
;
}
}
if
((
ctx
=
ngx_pcalloc
(
r
->
pool
,
sizeof
(
ngx_http_push_stream_
subscriber
_ctx_t
)))
==
NULL
)
{
if
((
ctx
=
ngx_pcalloc
(
r
->
pool
,
sizeof
(
ngx_http_push_stream_
module
_ctx_t
)))
==
NULL
)
{
return
NULL
;
return
NULL
;
}
}
...
@@ -1148,7 +1148,7 @@ static void
...
@@ -1148,7 +1148,7 @@ static void
ngx_http_push_stream_cleanup_request_context
(
ngx_http_request_t
*
r
)
ngx_http_push_stream_cleanup_request_context
(
ngx_http_request_t
*
r
)
{
{
ngx_slab_pool_t
*
shpool
=
(
ngx_slab_pool_t
*
)
ngx_http_push_stream_shm_zone
->
shm
.
addr
;
ngx_slab_pool_t
*
shpool
=
(
ngx_slab_pool_t
*
)
ngx_http_push_stream_shm_zone
->
shm
.
addr
;
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_shmtx_lock
(
&
shpool
->
mutex
);
ngx_shmtx_lock
(
&
shpool
->
mutex
);
if
(
ctx
!=
NULL
)
{
if
(
ctx
!=
NULL
)
{
...
@@ -1456,7 +1456,7 @@ ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_
...
@@ -1456,7 +1456,7 @@ ngx_http_push_stream_apply_template_to_each_line(ngx_str_t *text, const ngx_str_
static
void
static
void
ngx_http_push_stream_add_polling_headers
(
ngx_http_request_t
*
r
,
time_t
last_modified_time
,
ngx_int_t
tag
,
ngx_pool_t
*
temp_pool
)
ngx_http_push_stream_add_polling_headers
(
ngx_http_request_t
*
r
,
time_t
last_modified_time
,
ngx_int_t
tag
,
ngx_pool_t
*
temp_pool
)
{
{
ngx_http_push_stream_
subscriber
_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module
_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
if
(
ctx
->
callback
!=
NULL
)
{
if
(
ctx
->
callback
!=
NULL
)
{
r
->
headers_out
.
content_type_len
=
NGX_HTTP_PUSH_STREAM_CALLBACK_CONTENT_TYPE
.
len
;
r
->
headers_out
.
content_type_len
=
NGX_HTTP_PUSH_STREAM_CALLBACK_CONTENT_TYPE
.
len
;
...
@@ -1483,7 +1483,7 @@ ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modi
...
@@ -1483,7 +1483,7 @@ ngx_http_push_stream_add_polling_headers(ngx_http_request_t *r, time_t last_modi
static
void
static
void
ngx_http_push_stream_get_last_received_message_values
(
ngx_http_request_t
*
r
,
time_t
*
if_modified_since
,
ngx_int_t
*
tag
,
ngx_str_t
**
last_event_id
)
ngx_http_push_stream_get_last_received_message_values
(
ngx_http_request_t
*
r
,
time_t
*
if_modified_since
,
ngx_int_t
*
tag
,
ngx_str_t
**
last_event_id
)
{
{
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_str_t
*
etag
=
NULL
,
vv_etag
=
ngx_null_string
;
ngx_str_t
*
etag
=
NULL
,
vv_etag
=
ngx_null_string
;
ngx_str_t
vv_event_id
=
ngx_null_string
,
vv_time
=
ngx_null_string
;
ngx_str_t
vv_event_id
=
ngx_null_string
,
vv_time
=
ngx_null_string
;
...
...
src/ngx_http_push_stream_module_websocket.c
View file @
ae39453b
...
@@ -39,7 +39,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
...
@@ -39,7 +39,7 @@ ngx_http_push_stream_websocket_handler(ngx_http_request_t *r)
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_loc_conf_t
*
cf
=
ngx_http_get_module_loc_conf
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_subscriber_t
*
worker_subscriber
;
ngx_http_push_stream_subscriber_t
*
worker_subscriber
;
ngx_http_push_stream_requested_channel_t
*
channels_ids
,
*
cur
;
ngx_http_push_stream_requested_channel_t
*
channels_ids
,
*
cur
;
ngx_http_push_stream_
subscriber_ctx_t
*
ctx
;
ngx_http_push_stream_
module_ctx_t
*
ctx
;
ngx_int_t
tag
;
ngx_int_t
tag
;
time_t
if_modified_since
;
time_t
if_modified_since
;
ngx_str_t
*
last_event_id
=
NULL
;
ngx_str_t
*
last_event_id
=
NULL
;
...
@@ -282,7 +282,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
...
@@ -282,7 +282,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
}
}
}
}
ngx_http_push_stream_
subscriber
_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_
module
_ctx_t
*
ctx
=
ngx_http_get_module_ctx
(
r
,
ngx_http_push_stream_module
);
ngx_http_push_stream_subscription_t
*
subscription
=
(
ngx_http_push_stream_subscription_t
*
)
ngx_queue_head
(
&
ctx
->
subscriber
->
subscriptions_sentinel
.
queue
);
ngx_http_push_stream_subscription_t
*
subscription
=
(
ngx_http_push_stream_subscription_t
*
)
ngx_queue_head
(
&
ctx
->
subscriber
->
subscriptions_sentinel
.
queue
);
if
(
ngx_http_push_stream_add_msg_to_channel
(
r
,
&
subscription
->
channel
->
id
,
frame
.
payload
,
frame
.
payload_len
,
NULL
,
NULL
,
temp_pool
)
==
NULL
)
{
if
(
ngx_http_push_stream_add_msg_to_channel
(
r
,
&
subscription
->
channel
->
id
,
frame
.
payload
,
frame
.
payload_len
,
NULL
,
NULL
,
temp_pool
)
==
NULL
)
{
ngx_http_finalize_request
(
r
,
NGX_OK
);
ngx_http_finalize_request
(
r
,
NGX_OK
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment