Commit a51c8214 authored by Vysheng's avatar Vysheng

Sometimes client can recover from network loss

parent d7b25cca
...@@ -1062,8 +1062,23 @@ void work_bad_server_salt (struct connection *c UU, long long msg_id UU) { ...@@ -1062,8 +1062,23 @@ void work_bad_server_salt (struct connection *c UU, long long msg_id UU) {
GET_DC(c)->server_salt = new_server_salt; GET_DC(c)->server_salt = new_server_salt;
} }
void work_pong (struct connection *c UU, long long msg_id UU) {
assert (fetch_int () == CODE_pong);
fetch_long (); // msg_id
fetch_long (); // ping_id
}
void work_detained_info (struct connection *c UU, long long msg_id UU) {
assert (fetch_int () == CODE_msg_detained_info);
fetch_long (); // msg_id
fetch_long (); // answer_msg_id
fetch_int (); // bytes
fetch_int (); // status
}
void rpc_execute_answer (struct connection *c, long long msg_id UU) { void rpc_execute_answer (struct connection *c, long long msg_id UU) {
if (verbosity >= 5) { if (verbosity >= 5) {
logprintf ("rpc_execute_answer: fd=%d\n", c->fd);
hexdump_in (); hexdump_in ();
} }
int op = prefetch_int (); int op = prefetch_int ();
...@@ -1098,6 +1113,12 @@ void rpc_execute_answer (struct connection *c, long long msg_id UU) { ...@@ -1098,6 +1113,12 @@ void rpc_execute_answer (struct connection *c, long long msg_id UU) {
case CODE_bad_server_salt: case CODE_bad_server_salt:
work_bad_server_salt (c, msg_id); work_bad_server_salt (c, msg_id);
return; return;
case CODE_pong:
work_pong (c, msg_id);
return;
case CODE_msg_detained_info:
work_detained_info (c, msg_id);
return;
} }
logprintf ( "Unknown message: \n"); logprintf ( "Unknown message: \n");
hexdump_in (); hexdump_in ();
...@@ -1158,6 +1179,7 @@ int process_rpc_message (struct connection *c UU, struct encrypted_message *enc, ...@@ -1158,6 +1179,7 @@ int process_rpc_message (struct connection *c UU, struct encrypted_message *enc,
} }
assert (c->session->session_id == enc->session_id); assert (c->session->session_id == enc->session_id);
rpc_execute_answer (c, enc->msg_id); rpc_execute_answer (c, enc->msg_id);
assert (in_ptr == in_end);
return 0; return 0;
} }
......
...@@ -76,6 +76,8 @@ ...@@ -76,6 +76,8 @@
#define CODE_input_peer_notify_settings_old 0x3cf4b1be #define CODE_input_peer_notify_settings_old 0x3cf4b1be
#define CODE_peer_notify_settings_old 0xddbcd4a5 #define CODE_peer_notify_settings_old 0xddbcd4a5
#define CODE_msg_detained_info 0x276d3ec6
/* not really a limit, for struct encrypted_message only */ /* not really a limit, for struct encrypted_message only */
// #define MAX_MESSAGE_INTS 16384 // #define MAX_MESSAGE_INTS 16384
#define MAX_MESSAGE_INTS 1048576 #define MAX_MESSAGE_INTS 1048576
......
...@@ -42,6 +42,51 @@ DEFINE_TREE(int,int,int_cmp,0) ...@@ -42,6 +42,51 @@ DEFINE_TREE(int,int,int_cmp,0)
int verbosity; int verbosity;
extern struct connection_methods auth_methods; extern struct connection_methods auth_methods;
void fail_connection (struct connection *c);
void start_ping_timer (struct connection *c);
int ping_alarm (struct connection *c) {
if (verbosity > 2) {
logprintf ("ping alarm\n");
}
if (get_double_time () - c->last_receive_time > 20) {
c->state = conn_failed;
fail_connection (c);
} else if (get_double_time () - c->last_receive_time > 5 && c->state == conn_ready) {
int x[3];
x[0] = CODE_ping;
*(long long *)(x + 1) = lrand48 () * (1ll << 32) + lrand48 ();
encrypt_send_message (c, x, 3, 0);
start_ping_timer (c);
} else {
start_ping_timer (c);
}
return 0;
}
void stop_ping_timer (struct connection *c) {
remove_event_timer (&c->ev);
}
void start_ping_timer (struct connection *c) {
c->ev.timeout = get_double_time () + 1;
c->ev.alarm = (void *)ping_alarm;
c->ev.self = c;
insert_event_timer (&c->ev);
}
void restart_connection (struct connection *c);
int fail_alarm (void *ev) {
restart_connection (ev);
return 0;
}
void start_fail_timer (struct connection *c) {
c->ev.timeout = get_double_time () + 10;
c->ev.alarm = (void *)fail_alarm;
c->ev.self = c;
insert_event_timer (&c->ev);
}
struct connection_buffer *new_connection_buffer (int size) { struct connection_buffer *new_connection_buffer (int size) {
struct connection_buffer *b = malloc (sizeof (*b)); struct connection_buffer *b = malloc (sizeof (*b));
memset (b, 0, sizeof (*b)); memset (b, 0, sizeof (*b));
...@@ -195,10 +240,11 @@ struct connection *create_connection (const char *host, int port, struct session ...@@ -195,10 +240,11 @@ struct connection *create_connection (const char *host, int port, struct session
c->session = session; c->session = session;
c->fd = fd; c->fd = fd;
c->ip = htonl (*(int *)h->h_addr); c->ip = strdup (host);
c->flags = 0; c->flags = 0;
c->state = conn_ready; c->state = conn_ready;
c->methods = methods; c->methods = methods;
c->port = port;
assert (!Connections[fd]); assert (!Connections[fd]);
Connections[fd] = c; Connections[fd] = c;
if (verbosity) { if (verbosity) {
...@@ -207,10 +253,60 @@ struct connection *create_connection (const char *host, int port, struct session ...@@ -207,10 +253,60 @@ struct connection *create_connection (const char *host, int port, struct session
if (c->methods->ready) { if (c->methods->ready) {
c->methods->ready (c); c->methods->ready (c);
} }
c->last_receive_time = get_double_time ();
start_ping_timer (c);
return c; return c;
} }
void restart_connection (struct connection *c) {
if (c->last_connect_time == time (0)) {
return;
}
c->last_connect_time = time (0);
int fd;
assert ((fd = socket (AF_INET, SOCK_STREAM, 0)) != -1);
assert (fd >= 0 && fd < MAX_CONNECTIONS);
if (fd > max_connection_fd) {
max_connection_fd = fd;
}
int flags = -1;
setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof (flags));
setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof (flags));
setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof (flags));
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons (c->port);
addr.sin_addr.s_addr = inet_addr (c->ip);
fcntl (fd, F_SETFL, O_NONBLOCK);
if (connect (fd, (struct sockaddr *) &addr, sizeof (addr)) == -1) {
if (errno != EINPROGRESS) {
logprintf ( "Can not connect to %s:%d %m\n", c->ip, c->port);
start_fail_timer (c);
close (fd);
return;
}
}
c->fd = fd;
c->state = conn_connecting;
c->last_receive_time = get_double_time ();
start_ping_timer (c);
Connections[fd] = c;
char byte = 0xef;
assert (write_out (c, &byte, 1) == 1);
flush_out (c);
}
void fail_connection (struct connection *c) { void fail_connection (struct connection *c) {
if (c->state == conn_ready || c->state == conn_connecting) {
stop_ping_timer (c);
}
struct connection_buffer *b = c->out_head; struct connection_buffer *b = c->out_head;
while (b) { while (b) {
struct connection_buffer *d = b; struct connection_buffer *d = b;
...@@ -226,6 +322,10 @@ void fail_connection (struct connection *c) { ...@@ -226,6 +322,10 @@ void fail_connection (struct connection *c) {
c->out_head = c->out_tail = c->in_head = c->in_tail = 0; c->out_head = c->out_tail = c->in_head = c->in_tail = 0;
c->state = conn_failed; c->state = conn_failed;
c->out_bytes = c->in_bytes = 0; c->out_bytes = c->in_bytes = 0;
close (c->fd);
Connections[c->fd] = 0;
logprintf ("Lost connection to server... \n");
restart_connection (c);
} }
void try_write (struct connection *c) { void try_write (struct connection *c) {
...@@ -334,6 +434,11 @@ void try_read (struct connection *c) { ...@@ -334,6 +434,11 @@ void try_read (struct connection *c) {
int x = 0; int x = 0;
while (1) { while (1) {
int r = read (c->fd, c->in_tail->wptr, c->in_tail->end - c->in_tail->wptr); int r = read (c->fd, c->in_tail->wptr, c->in_tail->end - c->in_tail->wptr);
if (r > 0) {
c->last_receive_time = get_double_time ();
stop_ping_timer (c);
start_ping_timer (c);
}
if (r >= 0) { if (r >= 0) {
c->in_tail->wptr += r; c->in_tail->wptr += r;
x += r; x += r;
...@@ -364,16 +469,21 @@ void try_read (struct connection *c) { ...@@ -364,16 +469,21 @@ void try_read (struct connection *c) {
int connections_make_poll_array (struct pollfd *fds, int max) { int connections_make_poll_array (struct pollfd *fds, int max) {
int _max = max; int _max = max;
int i; int i;
for (i = 0; i <= max_connection_fd; i++) if (Connections[i] && Connections[i]->state != conn_failed) { for (i = 0; i <= max_connection_fd; i++) {
assert (max > 0); if (Connections[i] && Connections[i]->state == conn_failed) {
struct connection *c = Connections[i]; restart_connection (Connections[i]);
fds[0].fd = c->fd; }
fds[0].events = POLLERR | POLLHUP | POLLRDHUP | POLLIN; if (Connections[i] && Connections[i]->state != conn_failed) {
if (c->out_bytes || c->state == conn_connecting) { assert (max > 0);
fds[0].events |= POLLOUT; struct connection *c = Connections[i];
fds[0].fd = c->fd;
fds[0].events = POLLERR | POLLHUP | POLLRDHUP | POLLIN;
if (c->out_bytes || c->state == conn_connecting) {
fds[0].events |= POLLOUT;
}
fds ++;
max --;
} }
fds ++;
max --;
} }
if (verbosity >= 10) { if (verbosity >= 10) {
logprintf ( "%d connections in poll\n", _max - max); logprintf ( "%d connections in poll\n", _max - max);
...@@ -398,7 +508,9 @@ void connections_poll_result (struct pollfd *fds, int max) { ...@@ -398,7 +508,9 @@ void connections_poll_result (struct pollfd *fds, int max) {
fail_connection (c); fail_connection (c);
} else if (fds[i].revents & POLLOUT) { } else if (fds[i].revents & POLLOUT) {
if (c->state == conn_connecting) { if (c->state == conn_connecting) {
logprintf ("connection ready\n");
c->state = conn_ready; c->state = conn_ready;
c->last_receive_time = get_double_time ();
} }
if (c->out_bytes) { if (c->out_bytes) {
try_write (c); try_write (c);
......
...@@ -32,7 +32,7 @@ struct dc; ...@@ -32,7 +32,7 @@ struct dc;
#define ACK_TIMEOUT 60 #define ACK_TIMEOUT 60
#define MAX_DC_ID 10 #define MAX_DC_ID 10
enum dc_state{ enum dc_state {
st_init, st_init,
st_reqpq_sent, st_reqpq_sent,
st_reqdh_sent, st_reqdh_sent,
...@@ -104,7 +104,7 @@ enum conn_state { ...@@ -104,7 +104,7 @@ enum conn_state {
struct connection { struct connection {
int fd; int fd;
int ip; char *ip;
int port; int port;
int flags; int flags;
enum conn_state state; enum conn_state state;
...@@ -117,9 +117,12 @@ struct connection { ...@@ -117,9 +117,12 @@ struct connection {
int out_bytes; int out_bytes;
int packet_num; int packet_num;
int out_packet_num; int out_packet_num;
int last_connect_time;
struct connection_methods *methods; struct connection_methods *methods;
struct session *session; struct session *session;
void *extra; void *extra;
struct event_timer ev;
double last_receive_time;
}; };
extern struct connection *Connections[]; extern struct connection *Connections[];
......
...@@ -1000,8 +1000,15 @@ int user_info_on_answer (struct query *q UU) { ...@@ -1000,8 +1000,15 @@ int user_info_on_answer (struct query *q UU) {
printf ("User "); printf ("User ");
print_user_name (U->id, C); print_user_name (U->id, C);
printf (":\n"); printf (":\n");
printf ("\t real name: %s %s\n", U->real_first_name, U->real_last_name); printf ("\treal name: %s %s\n", U->real_first_name, U->real_last_name);
printf ("\t phone: %s\n", U->phone); printf ("\tphone: %s\n", U->phone);
if (U->status.online > 0) {
printf ("\tonline\n");
} else {
printf ("\toffline (was online ");
print_date_full (U->status.when);
printf (")\n");
}
pop_color (); pop_color ();
print_end (); print_end ();
return 0; return 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment