Commit 3c629b22 authored by Vysheng's avatar Vysheng

fixed acks

parent d9478695
...@@ -1416,9 +1416,10 @@ void work_container (struct connection *c, long long msg_id UU) { ...@@ -1416,9 +1416,10 @@ void work_container (struct connection *c, long long msg_id UU) {
int i; int i;
for (i = 0; i < n; i++) { for (i = 0; i < n; i++) {
long long id = fetch_long (); long long id = fetch_long ();
int seqno = fetch_int (); //int seqno = fetch_int ();
if (seqno & 1) { fetch_int (); // seq_no
insert_seqno (c->session, seqno); if (id & 1) {
insert_msg_id (c->session, id);
} }
int bytes = fetch_int (); int bytes = fetch_int ();
int *t = in_end; int *t = in_end;
...@@ -1537,7 +1538,7 @@ void work_detailed_info (struct connection *c UU, long long msg_id UU) { ...@@ -1537,7 +1538,7 @@ void work_detailed_info (struct connection *c UU, long long msg_id UU) {
} }
void work_new_detailed_info (struct connection *c UU, long long msg_id UU) { void work_new_detailed_info (struct connection *c UU, long long msg_id UU) {
assert (fetch_int () == CODE_msg_detailed_info); assert (fetch_int () == (int)CODE_msg_new_detailed_info);
fetch_long (); // answer_msg_id fetch_long (); // answer_msg_id
fetch_int (); // bytes fetch_int (); // bytes
fetch_int (); // status fetch_int (); // status
...@@ -1548,6 +1549,15 @@ void work_updates_to_long (struct connection *c UU, long long msg_id UU) { ...@@ -1548,6 +1549,15 @@ void work_updates_to_long (struct connection *c UU, long long msg_id UU) {
logprintf ("updates to long... Getting difference\n"); logprintf ("updates to long... Getting difference\n");
do_get_difference (); do_get_difference ();
} }
void work_bad_msg_notification (struct connection *c UU, long long msg_id UU) {
assert (fetch_int () == (int)CODE_bad_msg_notification);
long long m1 = fetch_long ();
int s = fetch_int ();
int e = fetch_int ();
logprintf ("bad_msg_notification: msg_id = %lld, seq = %d, error = %d\n", m1, s, e);
}
void rpc_execute_answer (struct connection *c, long long msg_id UU) { void rpc_execute_answer (struct connection *c, long long msg_id UU) {
if (verbosity >= 5) { if (verbosity >= 5) {
logprintf ("rpc_execute_answer: fd=%d\n", c->fd); logprintf ("rpc_execute_answer: fd=%d\n", c->fd);
...@@ -1597,6 +1607,9 @@ void rpc_execute_answer (struct connection *c, long long msg_id UU) { ...@@ -1597,6 +1607,9 @@ void rpc_execute_answer (struct connection *c, long long msg_id UU) {
case CODE_updates_too_long: case CODE_updates_too_long:
work_updates_to_long (c, msg_id); work_updates_to_long (c, msg_id);
return; return;
case CODE_bad_msg_notification:
work_bad_msg_notification (c, msg_id);
return;
} }
logprintf ( "Unknown message: \n"); logprintf ( "Unknown message: \n");
hexdump_in (); hexdump_in ();
...@@ -1629,10 +1642,17 @@ int process_rpc_message (struct connection *c UU, struct encrypted_message *enc, ...@@ -1629,10 +1642,17 @@ int process_rpc_message (struct connection *c UU, struct encrypted_message *enc,
int this_server_time = enc->msg_id >> 32LL; int this_server_time = enc->msg_id >> 32LL;
if (!DC->server_time_delta) { if (!DC->server_time_delta) {
DC->server_time_delta = this_server_time - time (0); DC->server_time_delta = this_server_time - get_utime (CLOCK_REALTIME);
DC->server_time_udelta = this_server_time - get_utime (CLOCK_MONOTONIC); DC->server_time_udelta = this_server_time - get_utime (CLOCK_MONOTONIC);
} }
double st = get_server_time (DC); double st = get_server_time (DC);
if (this_server_time < st - 300 || this_server_time > st + 30) {
logprintf ("salt = %lld, session_id = %lld, msg_id = %lld, seq_no = %d, st = %lf, now = %lf\n", enc->server_salt, enc->session_id, enc->msg_id, enc->seq_no, st, get_utime (CLOCK_REALTIME));
in_ptr = enc->message;
in_end = in_ptr + (enc->msg_len / 4);
hexdump_in ();
}
assert (this_server_time >= st - 300 && this_server_time <= st + 30); assert (this_server_time >= st - 300 && this_server_time <= st + 30);
//assert (enc->msg_id > server_last_msg_id && (enc->msg_id & 3) == 1); //assert (enc->msg_id > server_last_msg_id && (enc->msg_id & 3) == 1);
if (verbosity >= 1) { if (verbosity >= 1) {
...@@ -1651,8 +1671,8 @@ int process_rpc_message (struct connection *c UU, struct encrypted_message *enc, ...@@ -1651,8 +1671,8 @@ int process_rpc_message (struct connection *c UU, struct encrypted_message *enc,
in_ptr = enc->message; in_ptr = enc->message;
in_end = in_ptr + (enc->msg_len / 4); in_end = in_ptr + (enc->msg_len / 4);
if (enc->seq_no & 1) { if (enc->msg_id & 1) {
insert_seqno (c->session, enc->seq_no); insert_msg_id (c->session, enc->msg_id);
} }
assert (c->session->session_id == enc->session_id); assert (c->session->session_id == enc->session_id);
rpc_execute_answer (c, enc->msg_id); rpc_execute_answer (c, enc->msg_id);
......
...@@ -44,7 +44,8 @@ ...@@ -44,7 +44,8 @@
#define POLLRDHUP 0 #define POLLRDHUP 0
#endif #endif
DEFINE_TREE(int,int,int_cmp,0) #define long_cmp(a,b) ((a) > (b) ? 1 : (a) == (b) ? 0 : -1)
DEFINE_TREE(long,long long,long_cmp,0)
double get_utime (int clock_id); double get_utime (int clock_id);
int verbosity; int verbosity;
...@@ -208,6 +209,20 @@ void flush_out (struct connection *c UU) { ...@@ -208,6 +209,20 @@ void flush_out (struct connection *c UU) {
struct connection *Connections[MAX_CONNECTIONS]; struct connection *Connections[MAX_CONNECTIONS];
int max_connection_fd; int max_connection_fd;
void rotate_port (struct connection *c) {
switch (c->port) {
case 443:
c->port = 80;
break;
case 80:
c->port = 25;
break;
case 25:
c->port = 443;
break;
}
}
struct connection *create_connection (const char *host, int port, struct session *session, struct connection_methods *methods) { struct connection *create_connection (const char *host, int port, struct session *session, struct connection_methods *methods) {
struct connection *c = malloc (sizeof (*c)); struct connection *c = malloc (sizeof (*c));
memset (c, 0, sizeof (*c)); memset (c, 0, sizeof (*c));
...@@ -332,6 +347,7 @@ void fail_connection (struct connection *c) { ...@@ -332,6 +347,7 @@ void fail_connection (struct connection *c) {
if (c->state == conn_ready || c->state == conn_connecting) { if (c->state == conn_ready || c->state == conn_connecting) {
stop_ping_timer (c); stop_ping_timer (c);
} }
rotate_port (c);
struct connection_buffer *b = c->out_head; struct connection_buffer *b = c->out_head;
while (b) { while (b) {
struct connection_buffer *d = b; struct connection_buffer *d = b;
...@@ -349,7 +365,7 @@ void fail_connection (struct connection *c) { ...@@ -349,7 +365,7 @@ void fail_connection (struct connection *c) {
c->out_bytes = c->in_bytes = 0; c->out_bytes = c->in_bytes = 0;
close (c->fd); close (c->fd);
Connections[c->fd] = 0; Connections[c->fd] = 0;
logprintf ("Lost connection to server... \n"); logprintf ("Lost connection to server... %s:%d\n", c->ip, c->port);
restart_connection (c); restart_connection (c);
} }
...@@ -572,25 +588,26 @@ void connections_poll_result (struct pollfd *fds, int max) { ...@@ -572,25 +588,26 @@ void connections_poll_result (struct pollfd *fds, int max) {
int send_all_acks (struct session *S) { int send_all_acks (struct session *S) {
clear_packet (); clear_packet ();
out_int (CODE_msgs_ack); out_int (CODE_msgs_ack);
out_int (tree_count_int (S->ack_tree)); out_int (CODE_vector);
out_int (tree_count_long (S->ack_tree));
while (S->ack_tree) { while (S->ack_tree) {
int x = tree_get_min_int (S->ack_tree); long long x = tree_get_min_long (S->ack_tree);
out_int (x); out_long (x);
S->ack_tree = tree_delete_int (S->ack_tree, x); S->ack_tree = tree_delete_long (S->ack_tree, x);
} }
encrypt_send_message (S->c, packet_buffer, packet_ptr - packet_buffer, 0); encrypt_send_message (S->c, packet_buffer, packet_ptr - packet_buffer, 0);
return 0; return 0;
} }
void insert_seqno (struct session *S, int seqno) { void insert_msg_id (struct session *S, long long id) {
if (!S->ack_tree) { if (!S->ack_tree) {
S->ev.alarm = (void *)send_all_acks; S->ev.alarm = (void *)send_all_acks;
S->ev.self = (void *)S; S->ev.self = (void *)S;
S->ev.timeout = get_double_time () + ACK_TIMEOUT; S->ev.timeout = get_double_time () + ACK_TIMEOUT;
insert_event_timer (&S->ev); insert_event_timer (&S->ev);
} }
if (!tree_lookup_int (S->ack_tree, seqno)) { if (!tree_lookup_long (S->ack_tree, id)) {
S->ack_tree = tree_insert_int (S->ack_tree, seqno, lrand48 ()); S->ack_tree = tree_insert_long (S->ack_tree, id, lrand48 ());
} }
} }
......
...@@ -30,7 +30,7 @@ struct dc; ...@@ -30,7 +30,7 @@ struct dc;
#define TG_VERSION "0.01-beta" #define TG_VERSION "0.01-beta"
#define ACK_TIMEOUT 60 #define ACK_TIMEOUT 1
#define MAX_DC_ID 10 #define MAX_DC_ID 10
enum dc_state { enum dc_state {
...@@ -57,7 +57,7 @@ struct session { ...@@ -57,7 +57,7 @@ struct session {
long long session_id; long long session_id;
int seq_no; int seq_no;
struct connection *c; struct connection *c;
struct tree_int *ack_tree; struct tree_long *ack_tree;
struct event_timer ev; struct event_timer ev;
}; };
...@@ -143,7 +143,7 @@ struct connection *create_connection (const char *host, int port, struct session ...@@ -143,7 +143,7 @@ struct connection *create_connection (const char *host, int port, struct session
int connections_make_poll_array (struct pollfd *fds, int max); int connections_make_poll_array (struct pollfd *fds, int max);
void connections_poll_result (struct pollfd *fds, int max); void connections_poll_result (struct pollfd *fds, int max);
void dc_create_session (struct dc *DC); void dc_create_session (struct dc *DC);
void insert_seqno (struct session *S, int seqno); void insert_msg_id (struct session *S, long long id);
struct dc *alloc_dc (int id, char *ip, int port); struct dc *alloc_dc (int id, char *ip, int port);
#define GET_DC(c) (c->session->dc) #define GET_DC(c) (c->session->dc)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment