$ git clone https://socialnetwork.ion.nu/socialnetwork.git
commit e5f793bcd508cce2e43ee4d87c73a8471cedd982
Author: Alicia <...>
Date: Sun Jan 8 14:27:00 2017 +0100
Improved udpstream to handle disconnects, intentional and timeouts.
diff --git a/udpstream.c b/udpstream.c
index dbd1049..2f48f5a 100644
--- a/udpstream.c
+++ b/udpstream.c
@@ -19,12 +19,19 @@
#include <string.h>
#include <stdlib.h>
#include <errno.h>
+#include <time.h>
#include <sys/socket.h>
#include "udpstream.h"
#define TYPE_PAYLOAD 0
#define TYPE_ACK 1
#define TYPE_RESEND 2
+#define TYPE_INIT 3 // Should be at the start of each connection
+#define TYPE_CLOSE 4 // Requesting to close the stream
+#define TYPE_CLOSED 5 // Confirming stream closure
+#define TYPE_PING 6
+#define TYPE_PONG 7
+#define TYPE_RESET 8
#define HEADERSIZE (sizeof(uint32_t)+sizeof(uint16_t)+sizeof(uint8_t))
// TODO: Handle stale connections, disconnects, maybe a connect message type?
@@ -35,6 +42,10 @@ struct packet
unsigned int buflen;
};
+#define STATE_INIT 1
+#define STATE_CLOSING 2
+#define STATE_CLOSED 4
+#define STATE_PING 8
struct udpstream
{
int sock;
@@ -48,13 +59,15 @@ struct udpstream
unsigned int recvpacketcount;
char* buf; // Received but unparsed data
unsigned int buflen;
+ unsigned char state;
+ time_t timestamp;
// TODO: add void pointer to keep relevant application data? plus a function to free it if the connection is closed or abandoned as stale
};
static struct udpstream** streams=0;
static unsigned int streamcount=0;
-struct udpstream* udpstream_new(int sock, struct sockaddr* addr, socklen_t addrlen)
+static struct udpstream* stream_new(int sock, struct sockaddr* addr, socklen_t addrlen)
{
struct udpstream* stream=malloc(sizeof(struct udpstream));
stream->sock=sock;
@@ -68,6 +81,8 @@ struct udpstream* udpstream_new(int sock, struct sockaddr* addr, socklen_t addrl
stream->recvpacketcount=0;
stream->buf=0;
stream->buflen=0;
+ stream->state=0; // Start new streams as invalid, need to init
+ stream->timestamp=time(0);
++streamcount;
streams=realloc(streams, sizeof(void*)*streamcount);
streams[streamcount-1]=stream;
@@ -90,12 +105,12 @@ static struct udpstream* stream_find(struct sockaddr* addr, socklen_t addrlen)
static ssize_t stream_send(struct udpstream* stream, uint8_t type, uint16_t seq, uint32_t size, const void* buf)
{
// TODO: Include a checksum in the header?
- unsigned char header[HEADERSIZE];
- memcpy(header, &size, sizeof(uint32_t));
- memcpy(header+sizeof(uint32_t), &seq, sizeof(uint16_t));
- memcpy(header+sizeof(uint32_t)+sizeof(uint16_t), &type, sizeof(uint8_t));
- sendto(stream->sock, header, HEADERSIZE, 0, &stream->addr, stream->addrlen);
- return sendto(stream->sock, buf, size, 0, &stream->addr, stream->addrlen);
+ unsigned char packet[HEADERSIZE+size];
+ memcpy(packet, &size, sizeof(uint32_t));
+ memcpy(packet+sizeof(uint32_t), &seq, sizeof(uint16_t));
+ memcpy(packet+sizeof(uint32_t)+sizeof(uint16_t), &type, sizeof(uint8_t));
+ memcpy(packet+HEADERSIZE, buf, size);
+ return sendto(stream->sock, packet, HEADERSIZE+size, 0, &stream->addr, stream->addrlen);
}
static void udpstream_requestresend(struct udpstream* stream, uint16_t seq)
@@ -123,14 +138,48 @@ static void udpstream_requestresend(struct udpstream* stream, uint16_t seq)
stream_send(stream, TYPE_RESEND, 0, missedcount*sizeof(uint16_t), missed);
}
+static void stream_free(struct udpstream* stream)
+{
+ free(stream->buf);
+ unsigned int i;
+ for(i=0; i<stream->recvpacketcount; ++i)
+ {
+ free(stream->recvpackets[i].buf);
+ }
+ free(stream->recvpackets);
+ for(i=0; i<stream->sentpacketcount; ++i)
+ {
+ free(stream->sentpackets[i].buf);
+ }
+ free(stream->sentpackets);
+ free(stream);
+ for(i=0; i<streamcount; ++i)
+ {
+ if(streams[i]==stream)
+ {
+ --streamcount;
+ memmove(&streams[i], &streams[i+1], sizeof(void*)*(streamcount-i));
+ }
+ }
+}
+
+struct udpstream* udpstream_new(int sock, struct sockaddr* addr, socklen_t addrlen)
+{
+ struct udpstream* stream=stream_new(sock, addr, addrlen);
+ stream->state=STATE_INIT; // If we're creating the stream we're the ones initializing it
+ stream_send(stream, TYPE_INIT, 0, 0, 0);
+ return stream;
+}
+
void udpstream_readsocket(int sock)
{
+ time_t now=time(0);
char buf[1024];
struct sockaddr addr;
socklen_t addrlen=sizeof(addr);
ssize_t len=recvfrom(sock, buf, 1024, 0, &addr, &addrlen);
struct udpstream* stream=stream_find(&addr, addrlen);
- if(!stream){stream=udpstream_new(sock, &addr, addrlen);}
+ if(!stream){stream=stream_new(sock, &addr, addrlen);}
stream->buflen+=len;
stream->buf=realloc(stream->buf, stream->buflen);
memcpy(stream->buf+(stream->buflen-len), buf, len);
@@ -145,8 +194,18 @@ void udpstream_readsocket(int sock)
// Complete packet available
memcpy(&seq, stream->buf+sizeof(uint32_t), sizeof(uint16_t));
memcpy(&type, stream->buf+sizeof(uint32_t)+sizeof(uint16_t), sizeof(uint8_t));
- if(type==TYPE_ACK) // Handle acknowledgement of sent packet
+ stream->timestamp=now;
+ if(!(stream->state&STATE_INIT) && type!=TYPE_INIT)
+ {
+ // Ditch invalid streams
+ stream_send(stream, TYPE_RESET, 0, 0, 0);
+ stream_free(stream);
+ return;
+ }
+ if((stream->state&STATE_CLOSING) && type!=TYPE_CLOSED){return;}
+ switch(type)
{
+ case TYPE_ACK: // Handle acknowledgement of sent packet
// Remove from sent messages, recipient has confirmed receiving it
if(payloadsize==sizeof(uint16_t))
{
@@ -167,47 +226,107 @@ void udpstream_readsocket(int sock)
}
stream->buflen-=(payloadsize+HEADERSIZE);
memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
- continue;
- }
- if(type==TYPE_RESEND) // TODO: Handle request to resend packets not received by the peer
- {
+ break;
+ case TYPE_RESEND: // TODO: Handle request to resend packets not received by the peer
fprintf(stderr, "TODO: resend packets\n");
stream->buflen-=(payloadsize+HEADERSIZE);
memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
- continue;
+ break;
+ case TYPE_PAYLOAD:
+ // Send ack, regardless of whether it's in the right order
+ stream_send(stream, TYPE_ACK, 0, sizeof(uint16_t), &seq);
+ // Add to list of parsed packets
+ ++stream->recvpacketcount;
+ stream->recvpackets=realloc(stream->recvpackets, sizeof(struct packet)*stream->recvpacketcount);
+ stream->recvpackets[stream->recvpacketcount-1].seq=seq;
+ stream->recvpackets[stream->recvpacketcount-1].buf=malloc(payloadsize);
+ stream->recvpackets[stream->recvpacketcount-1].buflen=payloadsize;
+ memcpy(stream->recvpackets[stream->recvpacketcount-1].buf, stream->buf+HEADERSIZE, payloadsize);
+ stream->buflen-=(payloadsize+HEADERSIZE);
+ memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
+ udpstream_requestresend(stream, seq); // Ask to resend if we're missing any packets
+ break;
+ case TYPE_INIT: // Should be at the start of each connection and must have sequence 0, size 0
+// TODO: If we receive a valid init for an already initialized stream, invalidate the old one (memset ->addr? plus STATE_CLOSED) and create a new stream to indicate a new connection?
+ if(seq || payloadsize)
+ {
+ stream_send(stream, TYPE_RESET, 0, 0, 0);
+ if(stream->state&STATE_INIT) // If it's an established stream, mark it as closed
+ {
+ stream->state|=STATE_CLOSED;
+ }else{ // Otherwise just ditch it
+ stream_free(stream);
+ return;
+ }
+ break;
+ }
+ stream->state|=STATE_INIT;
+ stream->buflen-=(payloadsize+HEADERSIZE);
+ memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
+ break;
+ case TYPE_CLOSE: // Requesting to close the stream
+ stream->state|=STATE_CLOSED;
+ stream_send(stream, TYPE_CLOSED, 0, 0, 0);
+ break;
+ case TYPE_CLOSED: // Confirming stream closure
+ if(stream->state&STATE_CLOSING)
+ {
+ stream_free(stream);
+ return;
+ }
+ break;
+ case TYPE_PING:
+ stream_send(stream, TYPE_PONG, 0, 0, 0);
+ case TYPE_PONG:
+ stream->state&=STATE_PING^0xff;
+ stream->buflen-=(payloadsize+HEADERSIZE);
+ memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
+ break;
+ case TYPE_RESET:
+ stream->state|=STATE_CLOSED;
+ break;
}
- // Send ack, regardless of whether it's in the right order
- stream_send(stream, TYPE_ACK, 0, sizeof(uint16_t), &seq);
- // Add to list of parsed packets
- ++stream->recvpacketcount;
- stream->recvpackets=realloc(stream->recvpackets, sizeof(struct packet)*stream->recvpacketcount);
- stream->recvpackets[stream->recvpacketcount-1].seq=seq;
- stream->recvpackets[stream->recvpacketcount-1].buf=malloc(payloadsize);
- stream->recvpackets[stream->recvpacketcount-1].buflen=payloadsize;
- memcpy(stream->recvpackets[stream->recvpacketcount-1].buf, stream->buf+HEADERSIZE, payloadsize);
- stream->buflen-=(payloadsize+HEADERSIZE);
- memmove(stream->buf, stream->buf+HEADERSIZE+payloadsize, stream->buflen);
- udpstream_requestresend(stream, seq); // Ask to resend if we're missing any packets
}
}
struct udpstream* udpstream_poll(void)
{
+ time_t now=time(0);
unsigned int i;
for(i=0; i<streamcount; ++i)
{
+ // Check for state changes
+ if(streams[i]->state&STATE_CLOSED){return streams[i];}
// Check for the next packet in the order
unsigned int i2;
for(i2=0; i2<streams[i]->recvpacketcount; ++i2)
{
if(streams[i]->recvpackets[i2].seq==streams[i]->inseq){return streams[i];}
}
+ // Send ping if it's been 20 seconds without any data, unless we already sent one
+ if(streams[i]->timestamp+20<now && !(streams[i]->state&STATE_PING))
+ {
+ streams[i]->state|=STATE_PING;
+ stream_send(streams[i], TYPE_PING, 0, 0, 0);
+ }
+ // Give up and consider it dead after 100 seconds more (2 minutes total)
+ else if(streams[i]->timestamp+120<now)
+ {
+ if(streams[i]->state&STATE_CLOSING) // Application already closed it
+ {
+ stream_free(streams[i]);
+ }else{
+ streams[i]->state|=STATE_CLOSED;
+ return streams[i];
+ }
+ }
}
return 0;
}
ssize_t udpstream_read(struct udpstream* stream, void* buf, size_t size)
{
+ if(stream->state&(STATE_CLOSED|STATE_CLOSING)){return 0;} // EOF, TODO: -1 and EBADFD for STATE_CLOSING?
// Check if it's any previously out of order packet's turn now
unsigned int i;
for(i=0; i<stream->recvpacketcount; ++i)
@@ -238,6 +357,7 @@ ssize_t udpstream_read(struct udpstream* stream, void* buf, size_t size)
ssize_t udpstream_write(struct udpstream* stream, const void* buf, size_t size)
{
+ if(stream->state&(STATE_CLOSED|STATE_CLOSING)){return 0;} // EOF, TODO: -1 and EBADFD for STATE_CLOSING?
// TODO: abort and return negative if sentpacketcount is too high? EWOULDBLOCK?
++stream->sentpacketcount;
stream->sentpackets=realloc(stream->sentpackets, sizeof(struct packet)*stream->sentpacketcount);
@@ -257,3 +377,14 @@ void udpstream_getaddr(struct udpstream* stream, struct sockaddr* addr, socklen_
}
int udpstream_getsocket(struct udpstream* stream){return stream->sock;}
+
+void udpstream_close(struct udpstream* stream)
+{
+ if(stream->state&STATE_CLOSED) // Closed by peer, just free it
+ {
+ stream_free(stream);
+ }else{
+ stream->state|=STATE_CLOSING;
+ stream_send(stream, TYPE_CLOSE, 0, 0, 0);
+ }
+}
diff --git a/udpstream.h b/udpstream.h
index bbc78e4..87ee173 100644
--- a/udpstream.h
+++ b/udpstream.h
@@ -36,4 +36,6 @@ extern ssize_t udpstream_write(struct udpstream* stream, const void* buf, size_t
extern void udpstream_getaddr(struct udpstream* stream, struct sockaddr* addr, socklen_t* addrlen);
extern int udpstream_getsocket(struct udpstream* stream);
+
+extern void udpstream_close(struct udpstream* stream);
#endif
diff --git a/udptest.c b/udptest.c
index 1378b8a..be25c6f 100644
--- a/udptest.c
+++ b/udptest.c
@@ -68,9 +68,9 @@ int main(int argc, char** argv)
uint32_t ip=addr.sin_addr.s_addr;
printf("From: %u.%u.%u.%u:%hu:\n", ip%0x100, (ip/0x100)%0x100, (ip/0x10000)%0x100, ip/0x1000000, ntohs(addr.sin_port));
}
- stream=rstream;
ssize_t len=udpstream_read(rstream, buf, 1024);
- if(len<1){continue;}
+ if(len<1){udpstream_close(rstream); if(stream==rstream){stream=0;} continue;}
+ stream=rstream;
write(1, buf, len);
}
}