Part 2: Refactor a legacy Worker Base - Part 2 - Scope Management

Rewrite the WorkerBase class

After fixing the Scope management problem, it’s time to rewrite the WorkerBase class in a way that the components are loosely coupling, composable and detachable. The solution turned out to be a very simple approach. It’s the middleware design that is very common in popular Web server frameworks (ASP.Net Core, Express.js, Koa.js,…).

In case you don’t know what a middleware is, read ASP.Net Core Middleware.

After analyzing the legacy WorkerBase class, I found that they could be organized into these middlewares

  • Exception handling middleware
  • Logging middleware
  • Message queue behaviors middleware

Worker middlewares

Read more

Part 1: Refactor a legacy Worker Base - Part 1 - The long lasting pain

The refactoring solution I presented in this post is the solution that I wrote in C#. It doesn’t mean you cannot do this in Nodejs. It is just because the team is migrating away from Nodejs to C#. We are familiar with these tools and they are already available as standard pattern in C#.

First thing first: An IOC Container

We learnt this at university. Why the heck did we forget this? Is it because the program language allows us to make this mistake so easily or is it because of the community that encourages the bad behaviors everywhere?

As I mentioned earlier, scope management of the legacy codebase is awful. We actually used a function to surround the scope of a message and the derived class is actually just a collection of function, not a scope container. Every time we want to activate a new method, we have to pass all the parameters downstream.

class WorkerBase {
  async start() {
    let message;
    do {
      message = await pullMessages(1);
      const context = this.buildContext(message);

      // this processMessage function wrap the scope of a message
      await this.processMessage(message, context);
    } while (message != null);
  }
}

// Worker service 1
class Worker1 extends WorkerBase {
  myProp = 1;

  async processMessage(message, context) {
    logic1(message, context);
    logic2(message, context);

    myProp++; // this will mutate myProp and affect other message
  }

  logic1(message, context) {}

  logic2(message, context) {}
}

We wrote JS in an OOP way but didn’t apply the OOP best practices!

Read more

Let’s talk about Microservices again!

In order to manage a Microservices system efficiently, people usually enforce all the microservices to follow some common patterns. This helps you standardize the monitoring process and add a new microservice more easily. In the Microservices project that I’m currently working on (at Agency Revolution), we also have to implement a base class for the each different type of microservice. The base class contains the logic to write log in the correct format, to handle some common errors correctly or to alert the developers if something go wrong, etc.

Basically, there are 2 types of Microservices in our system: Synchronous and Asynchronous. I will focus mostly on one type of Async worker in this post: The Message Queue workers. The base class was initially built in Nodejs. After several years of development, we started to face many problems with the design. And now, I’m going to show you how I identified the drawbacks and improved it with a better version in C#.

Why C#? I may explain in another post. But right now, you can take a look at this post first.

How it all began

First, we started with this Inheritance model, the way that most people will think of when they start implementing a Worker base module. We defined a super class that all the workers in the system can derive from. It contains the logic to pull messages from the corresponding queue and activate the main handler function.

// This is Javascript code
// The base class
class WorkerBase {
  constructor(config) { this.queueName = config.queueName; }

  async start() {
    let message;
    do {
      message = await pullMessages(1); // pull 1 message at a time
      await this.processMessage(message);
    } while (message != null);
  }

  // implement this in the derived class
  async processMessage(message) { throw new Error('Not implemented'); }
}

// Worker service 1
class Worker1 extends WorkerBase {
  constructor() { super({ queueName: 'Worker1' }); }

  async processMessage(message) {
    // implement worker1 logic here
  }
}

// Worker service 1
class Worker2 extends WorkerBase {
  constructor() { super({ queueName: 'Worker2' }); }

  async processMessage(message) {
    // implement worker2 logic here
  }
}

// to activate a worker
const worker = new Worker1();
await worker.start();
Read more

Nothing special here. It’s just a blog post for summarising my algorithm learning course. Here are some questions related to Priority Queues.

Related knowledge: Binary Heap & Heapsort Summary - Part 1 - Binary Heap

1. Dynamic median

Design a data type that supports insert in logarithmic time, find-the-median in constant time, and remove-the-median in logarithmic time. If the number of keys in the data type is even, find/remove the lower median.

Solution: Use 2 Sorted Binary Heap

  • The Max heap to store half smaller items
    • No items in the Max heap are bigger than the ones in the Mean heap
  • The Min heap to store the other half
    • No items in the Min heap are smaller than the ones in the Max heap
  • The 2 heap should be equal. That means, the number of item in each heap should be equal or maximum 1 item different than the other one.

You will need these methods

class Median {
    int[] maxHeap;
    int[] minHeap;

    void balance() {...}
    void insert(int) {...}
    int findMedian() {...}
    int removeMedian() {...}
}
Read more

Clean architecture with C#/.Net Core and MediatR - Part 3

The MediatR Library

Clean Architecture is an Interface design. Everything is connected via Interfaces and their hidden Implementations. A request comes from the Framework layer to the Business handler via a Business interface. A request from the Business circle to the database or other services is also activated using an Adapter interface.

Mediator pattern and MediatR library are just another way to write Interfaces and Implementations (via the TRequest/TResponse type). In fact, you can simply define your own interface and attach the corresponding implementation through your IOC container. However, the main reason I use MediatR is because of its excellent dynamic pipeline behavior, which helps me separate most of the cross-cutting concerns out of the main handler, makes everything cleaner and produces a concise, testable handler class.

A very simple handler in MediatR looks like this

public class InsertUser
{
    /// <summary>
    /// Insert a new User and return that User
    /// </summary>
    public class Request : IRequest<Response>
    {
        public int ClientId { get; set; }
        public string Username { get; set; }
        public string Password { get; set; }
    }

    public class Response
    {
        public User User { get; set; }
    }

    public class Handler : IRequestHandler<Request, Response>
    {
        public async Task<Response> Handle(Request request, CancellationToken cancellationToken)
        {
            // implement your logic here
            await CheckExistence(request.UserName);

            // implement your logic here
            var user = await SomeFunction(cancellationToken);

            return new Response
            {
                User = user
            }
        }
    }
}

In this simplest form, it doesn’t look so different from the way we usually do with a normal Interface. However, let’s imagine what will happen when you want to add these requirements

  • Log the related information to debug later.
  • Track the process metrics to monitor and analyze performance.
  • Lock the process to avoid race-condition.
  • Transform the request/response format in a pre-defined way.
  • Handle errors.
  • Other cross-cutting concerns?…
  • A more important question: How to group related requests and re-use these cross-cutting concern handlers?
Read more

Clean architecture with C#/.Net Core and MediatR - Part 2

3. Runtime Layer

The Runtime Layer contains nearly no logic. It is simply a place to bootstrap the application and register all the necessary dependencies. It acts as a gateway for inputting the data to the Business Flows and transfer the output back to the caller. That means, your Business Flows can be embedded into any Runtime type, from an HTTP API Server to a Worker that processes data from a Message Queues or a onetime Script,… Here are some examples how they should look like

HTTP Server

Http Runtime

For HTTP Server, the APIs simply transform the data from the deserializable format (the HTTP Request) to the Business Flow input and then serialize the output to send back to the client.

In case you use ASP.Net Core and Autofac (like me)…

public class Startup
{
    // ...other methods

    /// <summary>
    /// Autofac method
    /// </summary>
    /// <param name="builder"></param>
    public void ConfigureContainer(ContainerBuilder builder)
    {
        builder.RegisterModule<Truongtx.Business.AutofacModule>();
        builder.RegisterModule<Truongtx.Adapter.AutofacModule>();
    }
}

[ApiController]
public class NpsController : ControllerBase
{
    private readonly Business.ISendMarketingEmails _sendMarketingEmails;

    public NpsController(Business.ISendMarketingEmails sendMarketingEmails)
    {
        _sendMarketingEmails = sendMarketingEmails;
    }

    /// <summary>
    /// Send Marketing Emails for a Campaign
    /// </summary>
    /// <param name="marketingCampaignId"></param>
    /// <returns></returns>
    [Route("/api/marketing-campaigns/{marketingCampaignId}/send-emails")]
    [HttpPost]
    public Task<string> SendMarketingEmails(int marketingCampaignId)
        => _sendMarketingEmails.Execute(marketingCampaignId);

    // ... other APIs
}
Read more

Clean architecture with C#/.Net Core and MediatR - Part 1

2. Adapter Layer

The Business layer mentioned before contains a list of interfaces to connect to other external dependencies (external services, database storage). It doesn’t care what database system to use or what protocol the external services need. All those logic will be implemented in this Adapter layer.

Adapter Code

An implementation may look like this

public class GetContactsByMarketingCampaignId : IGetContactsByMarketingCampaignId
{
    private readonly IMapper _mapper;

    public GetContactsByMarketingCampaignId(IMapper mapper)
    {
        _mapper = mapper;
    }

    public IList<Business.Contact> Execute(int marketingCampaignId)
    {
        // get from Redis cache and then fallback to SQL
        var contacts = GetFromRedis(marketingCampaignId) ?? GetFromSql(marketingCampaignId);

        // use AutoMapper to map back to Business model
        return mapper.Map<IList<Business.Contact>>(contacts);
    }

    private IList<SqlModels.Contact> GetFromRedis(int marketingCampaignId)
    {
        // logic to get from redis here
        ...
    }

    private IList<SqlModels.Contact> GetFromSql(int marketingCampaignId)
    {
        // logic to get from sql here
        ...
    }
}
Read more

Okay, I’m porting some modules from Nodejs to C# and I couldn’t find any built-in modules or libraries to do this so I had to implement it manually, luckily, with the help from Stackoverflow.

I have a message that was encrypted using crypto-js and stored in the database. Here is the Nodejs code that generates the encrypted data

const cryptojs = require('crypto-js');
const encryptedMsg = cryptojs.AES.encrypt('message', 'secret').toString();

The result is a string that looks like this

U2FsdGVkX184KJolbrZkg8w+rX/V9OW7sbUvWPVogdY=

Now, I need to read it back in C# and decrypt it to get the original message. The built-in Aes class in C# requires a Key and an IV to be explicitly passed in but there is no utility to generate the Key and the IV from a specified string. The above encrypt method from crypto-js is a simplified and implicit version of the Key and the IV. It doesn’t play well with C# and actually is not the AES standard (crypto-js still allows you to pass in the Key and IV explicitly).

For AES Cipher Algorithm, we need a Key and an IV (Initialization Vector) to add randomness to the encrypted data.

After playing around with crypto-js code base and with the help from Stackoverflow, I finally figured out how the data is stored and how the Key/IV are generated. In order to derive a key from the passphrase, it uses the OpenSSL-compatible derivation function EVP_BytesToKey. Here are the steps

  • Generate a random 8byte salt.
  • Use it along with the input passphrase to generate the Key and the IV.
  • The Key and the IV are then fed into AES function to produce the ciphertext.
  • The final result is a base64-encoded string containing the Salted__ string at the beginning followed by the 8byte salt and the actual ciphertext.
Read more

Nodejs has been a pain point in our code base for years. It used to be the best choice when we started building our product but I have never considered it as a good solution for scaling. I have been trying to find a better language and a better architecture which can help the team scale more in the future. I finally decided to go with C# and Clean Architecture. They are not the best one, but at least they fit for the existing tech stack of the organization.

I will have another series talking about the mistakes in designing application from my experience (which is also related the Nodejs code base). In this post, I’m going to summarize how I built the new architecture using Clean Architecture with C# and the advantages of MediatR to make it really clean.

Clean Architecture revisit

You may have already seen the famous Clean Architecture circle diagram many times before. It’s a bit complicated for me so I will make it simple by just drawing these 3 circles.

Reference

Each circle is represented by a Project in C#. The outer one references to the inner one, not the reverse way. The inner one should have no realization of the outer framework that it runs on top.

Read more

Part 5 Scaling the System at AR - Part 5 - Message Queue for Scaling team

If you have read some of my previous blog post, you may know that we have been stuck with Rethinkdb for years. Rethinkdb was good database. However, the development was stopped some years ago and there is no sign that it will be continued in the future. We have been following some very active guys in the community and even thought about donating for them. However, all of them have lost their interest in Rethinkdb and decided to move forward with other alternative solutions. Also, as I have already mentioned before in Mistakes of a Software Engineer - Favor NoSQL over SQL, most of our use cases are not suitable with the design of Rethinkdb anymore and all the optimizations that we made are reaching their limit.

After several discussions and analysis, we decided to move away from Rethinkdb to MS SQL Server. Some requirements that we have to satisfy are

  • The user should be able to view and edit the data normally, without any downtime.
  • There should be a backup plan for it.
  • There should be an experimental period, where we can pick some users, turn on the new database and analyze the correctness of the data.

This can be achieved easily using the Pub/Sub model and Message Queue design described in Scaling the System at AR - Part 5 - Message Queue for Scaling team.

Flow

Read more