arrow Articles

Building a database from scratch - 4

date July 22, 2021

date 9 min read

An introduction to database fundamentals
Tim Armstrong Tim Armstrong, Founder, Consultant Engineer, Expert in Performance & Security
Building a database from scratch - 4

SHARE

linkedin share twitter share reddit share

Hey, welcome back for Part 4 on building a database from scratch.

In this part of the series, we’ll build out the UDP Server that we described in Part 1 / Architecture. We’re also going to cover the essentials of protocol design and some of the ways you can protect your servers from bad actors.

If you’re just joining then this part is a really odd place to jump in, so I highly recommend reading Parts 1–3 first.

So, if you’ve been following along, then at this point we have been through the architecture, data structures, and how to store and retrieve data. So now we need to start looking at formalising the code and building the server process for our database.

As usual, you can grab a copy of the code from where we left off over at Gitlab

Recap

Let’s start with a quick recap of the code we’ve written so far:

Models

class Ident(Array):
_length_ = 36
_type_ = c_char


class Sample(Structure, AsDictMixin):
_fields_ = [("ident", Ident), ("sample", c_double), ("timestamp", c_double)]

Here we defined a structure called Sample that consists of a 36 Byte identifier string, a double-precision floating-point number for the sample value, and a double-precision floating-point number for the timestamp.

To make life easier for ourselves, we also created a generic mixin-class that added the ability to easily convert the struct into a dictionary.

Database

ENV = lmdb.Environment(
    "example.lmdb",
    map_size=Gi,
    subdir=True,
    readonly=False,
    metasync=True,
    sync=True,
    map_async=False,
    mode=493,
    create=True,
    readahead=True,
    writemap=False,
    meminit=True,
    max_readers=126,
    max_dbs=2,
    max_spare_txns=1,
    lock=True,
)
KEYS_BY_IDENTITY = ENV.open_db("idenity_index".encode(), dupsort=True)
KEYS_BY_TIMESTAMP = ENV.open_db("timestamp_index".encode(), dupsort=True)


def write_sample(ident: str, sample: float, timestamp: float) -> Sample:
    ident_encoded = ident.encode()
    input_sample = Sample(ident=ident_encoded, sample=sample, timestamp=timestamp)
    global ENV, KEYS_BY_IDENTITY, KEYS_BY_TIMESTAMP
    with ENV.begin(write=True) as txn:
        key = input_sample.ident + b"/" + int(input_sample.timestamp*10).to_bytes(8, byteorder="big")
        txn.put(key, bytes(input_sample))
        txn.put(input_sample.ident, key, db=KEYS_BY_IDENTITY)
        txn.put(int(timestamp).to_bytes(8, byteorder="big"), key, db=KEYS_BY_TIMESTAMP)
    return input_sample

Here we opened the storage environment, added a sub-database for the Identity index, and one for the Timestamp index. We also drafted a way to write a sample to the database, and a way to read samples from the database by getting all of the samples that had a matching index.

There’s a couple of things to note at this point, while we can insert and retrieve data, we are missing a lot of the basic functionality that you’d expect from a database. Commonly we’d expect a database to conform to CRUD (Create, Read, Update, Delete) at a minimum, however as we are building an application-specific database and have no need for updates, deletes, or reading individual samples from the database, there is very little point in us implementing these features. Should we need them in the future, we can build them with relative ease, so it’s best to leave that until we need it, as the alternative is carrying unused cruft that will stagnate and could lead to problems down the road.

ECHO… Echo…echo

So looking back at our spec from Part 1 we acknowledged that the apparatus needed us to reflect the timestamp back as fast as possible, acting in effect as an ACK (Acknowledgement) to prevent the apparatus from re-transmitting the sample, so the first step in building our server then would be to build a system that successfully ACKs the samples.

To do this we’ll use Twisted (one of the oldest networking libraries for python that is still actively maintained). The key benefit of using Twisted here is that it abstracts away all of the main-loop / reactor / listener stuff, along with buffer allocations and any fragment reconstruction for us. Allowing us to focus on the protocol development without needing to think about the generic network processing boilerplate.

Kicking things off then, we need to import DatagramProtocol (UDP handler) and reactor (main-loop / socket listener):

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor

Next let’s start to implement the minimum spec of the protocol by sub-classing DatagramProtocol and providing a definition for the datagramReceived abstract method. In this method, we’ll need to parse the inbound packet, which rather helpfully, happens to match the structure we defined for our database model Sample. So let’s import that and parse the inbound packet:

from database import Sample

class Echo(DatagramProtocol):
    def datagramReceived(self, data, addr):
        sample = Sample.from_buffer_copy(data)

Okay, that’s great, but as I mentioned we need to respond with just the timestamp, and defining an entire Structure class to carry a single variable seems a little excessive. Fortunately python has another method to handle binary structures, the struct library.

Using the struct library is pretty straight forward, there’s two functions that you commonly need to care about pack and unpack. These functions take a special format string as the first argument. This format string tells python what the binary representation of our data should look like. For pack this is followed by arguments matching both the type and position defined in the format string. For unpack it is followed by the buffer to read from. Extending the datagramReceived method then:

def datagramReceived(self, data, addr):
        sample = Sample.from_buffer_copy(data)
        self.transport.write(struct.pack("d", sample.timestamp), addr)

Finally, we need to configure the listener and run the main loop:

reactor.listenUDP(9999, Echo())
reactor.run()

To test this execute the file, then open a python console and import socket, Sample, and struct:

>>> import socket
>>> import struct
>>> from database import Sample

Then prepare a UDP socket and a sample:

>>> sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
>>> sample = Sample(b"Hello, world!", 0.1, 0.01)

Finally send a packet, listen for the response, and validate the value we receive back:

>>> sock.sendto(bytes(sample), ("127.0.0.1", 9999))
56
>>> data, addr = sock.recvfrom(1024)  # setting the receive buffer size to an arbitrary 1024 bytes
>>> struct.unpack("d", data)
(0.01,)

Nice, it works!

UDP = Unlimited Data Problems…

When we’re working with UDP we need to remember a couple of key things:

  1. UDP is stateless and doesn’t have the concept of a Connection (as in TCP)
  2. Out of arrive UDP packets can sync — UDP packets can arrive out of sync (especially if fragmented)

This means we need to be really careful when we design our UDP based protocols. If we’re sending large amounts of data in either direction, we need to ensure that we can reconstruct the original order (or discard late packets if dealing with real-time data like games or live video streams). If we’re sending a response from a server to a client, we need to ensure that we aren’t being used as part of a UDP amplification attack. Meaning that we must design our protocol such that the size of the first response to an incoming message is always less-than or equal-to the size of the message we received.

Lastly, because we can’t ever truly trust the origin of a UDP packet, input validation is essential! The safest move from a security perspective is to refuse to respond to a packet that fails input validation, and have the client re-transmit if it doesn’t get an ACK within a certain time window. However, in some cases you might want to provide back-pressure if the server is overloaded or failing (an HTTP equivalent here would be the 5xx series of errors / status messages).

In the protocol that we’re implementing here, most of those considerations are already accounted for (response is smaller than the incoming; if a message doesn’t get an ACK the apparatus re-transmits; inbound packets have a fixed size and structure). This makes our validation quite simple, first we’ll want to verify the size of the received packet (which should be exactly 56 Bytes):

if len(data) != 56:
    return

Next we want to validate that it parses which is where it gets really interesting. This is because when we’re using the Structure.from_buffer_copy method we’re actually asking python to handle this as raw bytes. So if it’s the right length then it’s probably going to “parse”, it might not be correct, but it’ll most likely find some way to make sense of the stream of bits it received.

This means we need to validate the input manually against sensible constraints. So what are reasonable constraints, well the manual for the apparatus states that the measured samples will be “a double-precision floating point value between between 0 and 1 (inclusive)”, so that’s pretty easy, but what about the other variables? Well we know that an ident is a UUID in the canonical text format, so we can validate that this parses as expected. Lastly we know that the timestamp will be less than 1 second old.

So lets implement those as filter functions:

def valid_timestamp(timestamp: float):
    return timestamp <= (datetime.now().timestamp() - 1)

def valid_sample(sample: float):
    return 0 < sample <= 1

def valid_ident(ident: bytes):
    try:
        str_ident = ident.decode("utf-8")
    except UnicodeDecodeError:
        return False
    try:
        UUID(str_ident)
    except ValueError:
        return False
    return True

And for convenience let’s extend the Sample struct with an is_valid method:

def is_valid(self):
    return valid_sample(self.sample) or valid_timestamp(self.timestamp) or valid_ident(self.ident))

Now we can update the datagramReceived method of our Echo protocol to validate the sample:

class Echo(DatagramProtocol):

    def datagramReceived(self, data, addr):
        if len(data) != 56:
            print("Bad Packet: wrong size")
            return
        sample = Sample.from_buffer_copy(data)
        if not sample.is_valid():
            return
        self.transport.write(struct.pack("d", sample.timestamp), addr)

Write [back to where we started]

In the part 2 we defined our write_sample function such that it accepted ident, sample, and timestamp as arguments, but now that we’re looking at the real data coming from our UDP service Echo this doesn’t make much sense any more, so let’s rewrite it to take in a Sample struct:

def write_sample(input_sample: Sample) -> None:
    global ENV, KEYS_BY_IDENTITY, KEYS_BY_TIMESTAMP
    with ENV.begin(write=True) as txn:
        key = input_sample.ident + b"/" + int(input_sample.timestamp*10).to_bytes(8, byteorder="big")
        txn.put(key, bytes(input_sample))
        txn.put(input_sample.ident, key, db=KEYS_BY_IDENTITY)
        txn.put(int(input_sample.timestamp).to_bytes(8, byteorder="big"), key, db=KEYS_BY_TIMESTAMP)

This results in both bulk_write and write_sample being very similar, and while we could clean this up by decoupling the common code into it’s own function, we’re just going to completely remove bulk_write as it was only created as a pedagogical example.

Finally then we want to put a call to write_sample into our datacramReceived method just before the response; ensuring that we don’t send the ACK unless we successfully stored the record:

def datagramReceived(self, data, addr):
    if len(data) != 56:
        print("Bad Packet: wrong size")
        return
    sample = Sample.from_buffer_copy(data)
    if not sample.is_valid():
        return
    write_sample(sample)
    self.transport.write(struct.pack("d", sample.timestamp), addr)

And there we have it, the UDP Service is finished.

The code

If you want to take a look at the final code from this part of the tutorial it’s available here.

arrow Articles overview