CW Decode and Deep Learning (3)

If you prefer to use an input sequence that better resembles a real-world situation, one way is to train with:

a, 101110
aa, 101110 101110
aal, 101110 101110 1011101010
aalii, 101110 101110 1011101010 1010 1010
    (many lines deleted)
zythia, 111011101010 11101011101110 1110 10101010 1010 101110
zythum, 111011101010 11101011101110 1110 10101010 10101110 11101110
zyzomys, 111011101010 11101011101110 111011101010 111011101110 11101110 11101011101110 101010
zyzzogeton, 111011101010 11101011101110 111011101010 111011101010 111011101110 1110111010 10 1110 111011101110 111010

It is assumed that the inter-character space is perfectly recovered and is shown as a space character.

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
lstm_1 (LSTM)                (None, 128)               67584     
_________________________________________________________________
repeat_vector_1 (RepeatVecto (None, 4, 128)            0         
_________________________________________________________________
lstm_2 (LSTM)                (None, 4, 128)            131584    
_________________________________________________________________
time_distributed_1 (TimeDist (None, 4, 27)             3483      
=================================================================
Total params: 202,651
Trainable params: 202,651
Non-trainable params: 0
_________________________________________________________________
Train on 4894 samples, validate on 100 samples
10111011101110 10101110 111010111010 1110101110                 juck     juck
1110101010 10101110 1011101010 1011101010                       bull     bull
101110111010 1010 1011101010 11101011101110                     pily     pily
1011101010 10101110 10 101010                                   lues     laes
1110111010 111011101110 1011101110 111010                       gown     gown
10101010 101110 10101110 1011101010                             haul     haul
1110 111011101110 1010 1011101010                               toil     toil

This is with hidden_size = 256.


This is with hidden_size = 64.

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
lstm_1 (LSTM)                (None, 256)               266240    
_________________________________________________________________
repeat_vector_1 (RepeatVecto (None, 4, 256)            0         
_________________________________________________________________
lstm_2 (LSTM)                (None, 4, 256)            525312    
_________________________________________________________________
time_distributed_1 (TimeDist (None, 4, 27)             6939      
=================================================================
Total params: 798,491
Trainable params: 798,491
Non-trainable params: 0
_________________________________________________________________
Train on 4894 samples, validate on 100 samples
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
lstm_1 (LSTM)                (None, 64)                17408     
_________________________________________________________________
repeat_vector_1 (RepeatVecto (None, 4, 64)             0         
_________________________________________________________________
lstm_2 (LSTM)                (None, 4, 64)             33024     
_________________________________________________________________
time_distributed_1 (TimeDist (None, 4, 27)             1755      
=================================================================
Total params: 52,187
Trainable params: 52,187
Non-trainable params: 0
_________________________________________________________________
Train on 4894 samples, validate on 100 samples
from keras.models import Sequential
from keras import layers
import numpy as np
import matplotlib.pyplot as plt


class CharTable(object):
    def __init__(self, chars):
        self.chars = sorted(set(chars))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))

    def encode(self, token, num_rows):
        x = np.zeros((num_rows, len(self.chars)))
        for i, c in enumerate(token):
            x[i, self.char_indices] = 1
        return x

    def decode(self, x, calc_argmax=True):
        if calc_argmax:
            x = [x.argmax(axis=-1)]
        return ''.join(self.indices_char[int(v)] for v in x)


def main():
    word_len = 4
    max_len_x = 15 * word_len + (word_len - 1)
    max_len_y = word_len

    input_list = []
    output_list = []
    fin = 'words_morse10.txt'
    with open(fin, 'r') as file:
        for line in file.read().splitlines():
            mylist = line.split(", ")
            [word, morse] = mylist
            morse = morse + ' ' * (max_len_x - len(morse))
            if len(word) == word_len:
                input_list.append(morse)
                output_list.append(word)

    chars_in = '10 '
    chars_out = 'abcdefghijklmnopqrstuvwxyz '
    ctable_in = CharTable(chars_in)
    ctable_out = CharTable(chars_out)

    x = np.zeros((len(input_list), max_len_x, len(chars_in)))
    y = np.zeros((len(output_list), max_len_y, len(chars_out)))
    for i, token in enumerate(input_list):
        x[i] = ctable_in.encode(token, max_len_x)
    for i, token in enumerate(output_list):
        y[i] = ctable_out.encode(token, max_len_y)

    indices = np.arange(len(y))
    np.random.shuffle(indices)
    x = x[indices]
    y = y[indices]

    m = len(x) - 100
    (x_train, x_val) = x[:m], x[m:]
    (y_train, y_val) = y[:m], y[m:]

    hidden_size = 128
    batch_size = 128
    nlayers = 1
    epochs = 150

    model = Sequential()
    model.add(layers.LSTM(hidden_size, input_shape=(max_len_x, len(chars_in))))
    model.add(layers.RepeatVector(word_len))

    for _ in range(nlayers):
        model.add(layers.LSTM(hidden_size, return_sequences=True))

    model.add(layers.TimeDistributed(layers.Dense(len(chars_out), activation='softmax')))
    model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])
    model.summary()

    hist = model.fit(x_train, y_train, batch_size=batch_size,
                     epochs=epochs, verbose=2, validation_data=(x_val, y_val))

    predict = model.predict_classes(x_val)

    for i in range(len(x_val)):
        print("".join([ctable_in.decode(code) for code in x_val[i]]),
              "".join([ctable_out.decode(code) for code in y_val[i]]), end="     ")
        for j in range(word_len):
            print(ctable_out.indices_char[predict[i][j]], end="")
        print()

    plt.figure(figsize=(16, 5))
    plt.subplot(121)
    plt.plot(hist.history['acc'])
    plt.plot(hist.history['val_acc'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.subplot(122)
    plt.plot(hist.history['loss'])
    plt.plot(hist.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper right')
    plt.show()


main()
import numpy as np

def morse_encode(word):
    return " ".join([morse_dict[i]for i in " ".join(word).split()])


def data_gen():
    fin = 'words_alpha.txt'

    with open(fin, 'r') as file:
        for word in file.read().lower().splitlines():
            print(word, morse_encode(word), sep=", ")

    return


alphabet = list("abcdefghijklmnopqrstuvwxyz")

# values = ['.-', '-...', '-.-.', '-..', '.', '..-.', '--.', '....', '..', '.---', '-.-',
#           '.-..', '--', '-.', '---', '.--.', '--.-',
#           '.-.', '...', '-', '..-', '...-', '.--', '-..-', '-.--', '--..']

values = ['101110', '1110101010', '111010111010', '11101010', '10', '1010111010',
		  '1110111010', '10101010', '1010', '10111011101110', '1110101110',
          '1011101010', '11101110', '111010', '111011101110', '101110111010',
          '11101110101110', '10111010', '101010', '1110', '10101110', '1010101110',
          '1011101110', '111010101110', '11101011101110', '111011101010']

morse_dict = dict(zip(alphabet, values))

data_gen()

CW Decode and Deep Learning (2)

a, .-
aa, .- .-
aal, .- .- .-..
aalii, .- .- .-.. .. ..
    (many lines deleted)
zythia, --.. -.-- - .... .. .-
zythum, --.. -.-- - .... ..- --
zyzomys, --.. -.-- --.. --- -- -.-- ...
zyzzogeton, --.. -.-- --.. --.. --- --. . - --- -.

This is a training sequence with 234,369 lines generated by a short Python program below.

Let’s see if we can decode the words with four characters.

Train on 4894 samples, validate on 100 samples

We have 4894+100 such words, so we randomly choose 100 words and put them aside so that they are only used for validation and not for training.

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
lstm_1 (LSTM)                (None, 128)               67584     
_________________________________________________________________
repeat_vector_1 (RepeatVecto (None, 4, 128)            0         
_________________________________________________________________
lstm_2 (LSTM)                (None, 4, 128)            131584    
_________________________________________________________________
lstm_3 (LSTM)                (None, 4, 128)            131584    
_________________________________________________________________
lstm_4 (LSTM)                (None, 4, 128)            131584    
_________________________________________________________________
time_distributed_1 (TimeDist (None, 4, 27)             3483      
=================================================================
Total params: 465,819
Trainable params: 465,819
Non-trainable params: 0
-.-. --- .--. .-    copa     copa
-.- -.-- .- ....    kyah     kyah
-... .-. .- .       brae     brae
-. .- .-. -.-       nark     nark
.--. .... --- ....  phoh     phob
.- --.. --- -.      azon     auon
-.-. --- ...- .     cove     cove
-... .- .-. ..      bari     bari
-- . .- -.-         meak     meak
-- --- -. --.       mong     mong
-- .- - .           mate     mate
- .... .. .-.       thir     thir

Not bad?

Note that the decoding program has no morse code tables whatever.

from keras.models import Sequential
from keras import layers
import numpy as np
import matplotlib.pyplot as plt


class CharTable(object):
    def __init__(self, chars):
        self.chars = sorted(set(chars))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))

    def encode(self, token, num_rows):
        x = np.zeros((num_rows, len(self.chars)))
        for i, c in enumerate(token):
            x[i, self.char_indices] = 1
        return x

    def decode(self, x, calc_argmax=True):
        if calc_argmax:
            x = [x.argmax(axis=-1)]
        return ''.join(self.indices_char[int(v)] for v in x)


def main():
    word_len = 4
    max_len_x = 4 * word_len + (word_len - 1)
    max_len_y = word_len

    input_list = []
    output_list = []
    fin = 'words_morse.txt'
    with open(fin, 'r') as file:
        for line in file.read().splitlines():
            mylist = line.split(", ")
            [word, morse] = mylist
            morse = morse + ' ' * (max_len_x - len(morse))
            if len(word) == word_len:
                input_list.append(morse)
                output_list.append(word)

    chars_in = '-. '
    chars_out = 'abcdefghijklmnopqrstuvwxyz '
    ctable_in = CharTable(chars_in)
    ctable_out = CharTable(chars_out)

    x = np.zeros((len(input_list), max_len_x, len(chars_in)))
    y = np.zeros((len(output_list), max_len_y, len(chars_out)))
    for i, token in enumerate(input_list):
        x[i] = ctable_in.encode(token, max_len_x)
    for i, token in enumerate(output_list):
        y[i] = ctable_out.encode(token, max_len_y)

    indices = np.arange(len(y))
    np.random.shuffle(indices)
    x = x[indices]
    y = y[indices]

    m = len(x) - 100
    (x_train, x_val) = x[:m], x[m:]
    (y_train, y_val) = y[:m], y[m:]

    hidden_size = 128
    batch_size = 128
    nlayers = 3
    epochs = 100

    model = Sequential()
    model.add(layers.LSTM(hidden_size, input_shape=(max_len_x, len(chars_in))))
    model.add(layers.RepeatVector(word_len))

    for _ in range(nlayers):
        model.add(layers.LSTM(hidden_size, return_sequences=True))

    model.add(layers.TimeDistributed(layers.Dense(len(chars_out), activation='softmax')))
    model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])
    model.summary()

    hist = model.fit(x_train, y_train, batch_size=batch_size,
                     epochs=epochs, verbose=2, validation_data=(x_val, y_val))

    predict = model.predict_classes(x_val)

    for i in range(len(x_val)):
        print("".join([ctable_in.decode(code) for code in x_val[i]]),
              "".join([ctable_out.decode(code) for code in y_val[i]]), end="     ")
        for j in range(word_len):
            print(ctable_out.indices_char[predict[i][j]], end="")
        print()

    plt.figure(figsize=(16, 5))
    plt.subplot(121)
    plt.plot(hist.history['acc'])
    plt.plot(hist.history['val_acc'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.subplot(122)
    plt.plot(hist.history['loss'])
    plt.plot(hist.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper right')
    plt.show()


main()

The following program is to generate a training sequence. A morse code table is required only in this program.

import numpy as np

def morse_encode(word):
    return " ".join([morse_dict[i]for i in " ".join(word).split()])


def data_gen():
    fin = 'words_alpha.txt'

    with open(fin, 'r') as file:
        for word in file.read().lower().splitlines():
            print(word, morse_encode(word), sep=", ")

    return


alphabet = list("abcdefghijklmnopqrstuvwxyz")
values = ['.-', '-...', '-.-.', '-..', '.', '..-.', '--.', '....', '..', '.---', '-.-',
          '.-..', '--', '-.', '---', '.--.', '--.-',
          '.-.', '...', '-', '..-', '...-', '.--', '-..-', '-.--', '--..']
morse_dict = dict(zip(alphabet, values))

data_gen()

Deep Learning with Keras

One simple example is here:

https://keras.io/examples/addition_rnn/

https://blog.keras.io/a-ten-minute-introduction-to-sequence-to-sequence-learning-in-keras.html

LSTM stands for Long short-term memory, a kind of a recurrent neural network (RNN).

Some learning curves are shown in the Figure with different hyperparameters.

Naturally, the prediction is not perfect.

from keras.models import Sequential
from keras import layers
import numpy as np
from six.moves import range


class CharacterTable(object):
    """Given a set of characters:
    + Encode them to a one-hot integer representation
    + Decode the one-hot or integer representation to their character output
    + Decode a vector of probabilities to their character output
    """
    def __init__(self, chars):
        """Initialize character table.

        # Arguments
            chars: Characters that can appear in the input.
        """
        self.chars = sorted(set(chars))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))

    def encode(self, C, num_rows):
        """One-hot encode given string C.

        # Arguments
            C: string, to be encoded.
            num_rows: Number of rows in the returned one-hot encoding. This is
                used to keep the # of rows for each data the same.
        """
        x = np.zeros((num_rows, len(self.chars)))
        for i, c in enumerate(C):
            x[i, self.char_indices] = 1
        return x

    def decode(self, x, calc_argmax=True):
        """Decode the given vector or 2D array to their character output.

        # Arguments
            x: A vector or a 2D array of probabilities or one-hot representations;
                or a vector of character indices (used with `calc_argmax=False`).
            calc_argmax: Whether to find the character index with maximum
                probability, defaults to `True`.
        """
        if calc_argmax:
            x = x.argmax(axis=-1)
        return ''.join(self.indices_char[x] for x in x)


class colors:
    ok = '\033[92m'
    fail = '\033[91m'
    close = '\033[0m'


# Parameters for the model and dataset.
TRAINING_SIZE = 50000
DIGITS = 3
REVERSE = True

# Maximum length of input is 'int + int' (e.g., '345+678'). Maximum length of
# int is DIGITS.
MAXLEN = DIGITS + 1 + DIGITS

# All the numbers, plus sign and space for padding.
chars = '0123456789+ '
ctable = CharacterTable(chars)

questions = []
expected = []
seen = set()
print('Generating data...')
while len(questions) < TRAINING_SIZE:
    f = lambda: int(''.join(np.random.choice(list('0123456789'))
                    for i in range(np.random.randint(1, DIGITS + 1))))
    a, b = f(), f()
    # Skip any addition questions we've already seen
    # Also skip any such that x+Y == Y+x (hence the sorting).
    key = tuple(sorted((a, b)))
    if key in seen:
        continue
    seen.add(key)
    # Pad the data with spaces such that it is always MAXLEN.
    q = '{}+{}'.format(a, b)
    query = q + ' ' * (MAXLEN - len(q))
    ans = str(a + b)
    # Answers can be of maximum size DIGITS + 1.
    ans += ' ' * (DIGITS + 1 - len(ans))
    if REVERSE:
        # Reverse the query, e.g., '12+345  ' becomes '  543+21'. (Note the
        # space used for padding.)
        query = query[::-1]
    questions.append(query)
    expected.append(ans)
print('Total addition questions:', len(questions))

print('Vectorization...')
x = np.zeros((len(questions), MAXLEN, len(chars)), dtype=np.bool)
y = np.zeros((len(questions), DIGITS + 1, len(chars)), dtype=np.bool)
for i, sentence in enumerate(questions):
    x[i] = ctable.encode(sentence, MAXLEN)
for i, sentence in enumerate(expected):
    y[i] = ctable.encode(sentence, DIGITS + 1)

# Shuffle (x, y) in unison as the later parts of x will almost all be larger
# digits.
indices = np.arange(len(y))
np.random.shuffle(indices)
x = x[indices]
y = y[indices]

# Explicitly set apart 10% for validation data that we never train over.
split_at = len(x) - len(x) // 10
(x_train, x_val) = x[:split_at], x[split_at:]
(y_train, y_val) = y[:split_at], y[split_at:]

print('Training Data:')
print(x_train.shape)
print(y_train.shape)

print('Validation Data:')
print(x_val.shape)
print(y_val.shape)

# Try replacing GRU, or SimpleRNN.
RNN = layers.LSTM
HIDDEN_SIZE = 128
BATCH_SIZE = 128
LAYERS = 1

print('Build model...')
model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE.
# Note: In a situation where your input sequences have a variable length,
# use input_shape=(None, num_feature).
model.add(RNN(HIDDEN_SIZE, input_shape=(MAXLEN, len(chars))))
# As the decoder RNN's input, repeatedly provide with the last output of
# RNN for each time step. Repeat 'DIGITS + 1' times as that's the maximum
# length of output, e.g., when DIGITS=3, max output is 999+999=1998.
model.add(layers.RepeatVector(DIGITS + 1))
# The decoder RNN could be multiple layers stacked or a single layer.
for _ in range(LAYERS):
    # By setting return_sequences to True, return not only the last output but
    # all the outputs so far in the form of (num_samples, timesteps,
    # output_dim). This is necessary as TimeDistributed in the below expects
    # the first dimension to be the timesteps.
    model.add(RNN(HIDDEN_SIZE, return_sequences=True))

# Apply a dense layer to the every temporal slice of an input. For each of step
# of the output sequence, decide which character should be chosen.
model.add(layers.TimeDistributed(layers.Dense(len(chars), activation='softmax')))
model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])
model.summary()

# Train the model each generation and show predictions against the validation
# dataset.

for iteration in range(1, 200):
    correct_count = 0
    error_count = 0
    print()
    print('- ' * 50)
    print('Iteration', iteration)
    model.fit(x_train, y_train,
              batch_size=BATCH_SIZE,
              epochs=1,
              validation_data=(x_val, y_val))
    # Select 10 samples from the validation set at random so we can visualize
    # errors.
    for i in range(len(x_val)):
        rowx, rowy = x_val[np.array([i])], y_val[np.array([i])]
        preds = model.predict_classes(rowx, verbose=0)
        q = ctable.decode(rowx[0])
        correct = ctable.decode(rowy[0])
        guess = ctable.decode(preds[0], calc_argmax=False)
        print('Q', q[::-1] if REVERSE else q, end=' ')
        print('T', correct, end=' ')
        if correct == guess:
            print(colors.ok + '☑' + colors.close, end=' ')
            correct_count = correct_count + 1
        else:
            print(colors.fail + '☒' + colors.close, end=' ')
            error_count = error_count + 1
        print(guess)

    print("score = ", correct_count / (correct_count+error_count))

AM Demodulation

Listening to an AM station on 954kHz.

If you can demodulate a CW signal, you can also demodulate AM signals by just changing your side tone frequency, usually 700Hz or so, to 0Hz.

An AM carrier component is translated to a near DC signal, which can easily be eliminated by applying, say, a HPF.