-
Notifications
You must be signed in to change notification settings - Fork 18
ASP.NET Core RethinkDB
RethinkDB is an open source distributed document-oriented database. As most NoSQL databases, it stores JSON documents with no required schema or table structure.
What makes RethinkDB great:
- horizontal scaling
- table replication and sharing
- automatic fail-over
- query language with join support, map-reduce and geospatial queries
- pub/sub for data changes
If you want a scalable, fault tolerant database system for your ASP.NET Core applications then you should consider using RethinkDB.
This guide shows you how to use RethinkDB from an ASP.NET Core and how to deploy your app along with a RethinkDB cluster using Docker for Windows and Docker Swarm mode. We will be using the TokenGen app from the Scale ASP.NET Core apps with Docker Swarm Mode guide and store the generated tokens along with the issuers in a RethinkDB database.
First we need to enable Swarm Mode and create a dedicated network for our RethinkDB cluster:
# initialize swarm
docker swarm init
# create RethinkDB overlay network
docker network create --driver overlay rdb-net
We start building our RethinkDB cluster by running a single RethinkDB server, we will remove this instance later on:
# create and start rethinkdb primary
docker service create --name rdb-primary --network rdb-net --replicas 1 rethinkdb:latest rethinkdb --bind all --no-http-admin
Now we can create a secondary RethinkDB node that will join the rdb-primary
node and form a cluster:
# create and start rethinkdb secondary
docker service create --name rdb-secondary --network rdb-net --replicas 1 rethinkdb:latest rethinkdb --bind all --no-http-admin --join rdb-primary
Scale the secondary node so we can have a minimum of 3 nodes needed for RethinkDB automatic fail-over mechanism:
# up 3 nodes (primary + two secondary) to enable automatic failover
docker service scale rdb-secondary=2
We now have a functional RethinkDB cluster, but we are not done yet.
Because we started the primary node without a join command, our cluster has a single point of failure.
If for some reason rdb-primary
container crashes, the Docker Swarm engine will recreate and start this container, but he can't join the existing cluster. If we start new rdb-secondary
instances, they will join the new rdb-primary
container and form another cluster.
To resolve this issue we have to remove the rdb-primary
service and recreate it with the join
command like so:
# remove primary
docker service rm rdb-primary
# recreate primary with --join flag
docker service create --name rdb-primary --network rdb-net --replicas 1 rethinkdb:latest rethinkdb --bind all --no-http-admin --join rdb-secondary
Now we can also scale the primary node:
# start two rdb-primary instances
docker service scale rdb-primary=2
At this point we have 4 nodes in our cluster, two rdb-primary
and two rdb-secondary
. We can further scale any of these two services and they will all join our cluster. If a rdb-primary
or rdb-secondary
instance crashes, the Docker Swarm will automatically start another container that will join our current cluster.
Last step is to create a RethinkDB proxy node, we expose port 8080 for the web admin and port 28015 so we can connect to the cluster from our app:
# create and start rethinkdb proxy
docker service create --name rdb-proxy --network rdb-net --publish 8080:8080 --publish 28015:28015 rethinkdb:latest rethinkdb proxy --bind all --join rdb-primary
Open a browser and navigate to http://localhost:8080
to check the cluster state. In the servers page you should see 4 servers connected to the cluster.
Open the TokenGen project from the previous guide and install the RethinkDB driver NuGet package:
Install-Package RethinkDb.Driver
We are going to create an object that holds the RethinkDB cluster address. Add a class named RethinkDbOptions
inside TokenGen project:
public class RethinkDbOptions
{
public string Host { get; set; }
public int Port { get; set; }
public string Database { get; set; }
public int Timeout { get; set; }
}
We will be storing the connection data in appconfig.json
and use a RethinkDbOptions
object to inject the data into RethinkDbConnectionFactory
.
The RethinkDbConnectionFactory
will provide a persistent connection to the RethinkDB cluster for our app:
public class RethinkDbConnectionFactory : IRethinkDbConnectionFactory
{
private static RethinkDB R = RethinkDB.R;
private Connection conn;
private RethinkDbOptions _options;
public RethinkDbConnectionFactory(IOptions<RethinkDbOptions> options)
{
_options = options.Value;
}
public Connection CreateConnection()
{
if (conn == null)
{
conn = R.Connection()
.Hostname(_options.Host)
.Port(_options.Port)
.Timeout(_options.Timeout)
.Connect();
}
if(!conn.Open)
{
conn.Reconnect();
}
return conn;
}
public void CloseConnection()
{
if (conn != null && conn.Open)
{
conn.Close(false);
}
}
public RethinkDbOptions GetOptions()
{
return _options;
}
}
Open appsettings.json
and add the RethinkDB cluster connection data:
"RethinkDbDev": {
"Host": "localhost",
"Port": 28015,
"Timeout": 10,
"Database": "TokenStore"
}
Now we can create a singleton instance of RethinkDbConnectionFactory
in Startup.cs
. You should reuse the same connection across whole application since the RethinkDb
connection is thread safe.
public void ConfigureServices(IServiceCollection services)
{
//....
services.Configure<RethinkDbOptions>(Configuration.GetSection("RethinkDbDev"));
services.AddSingleton<IRethinkDbConnectionFactory, RethinkDbConnectionFactory>();
}
Test the connection inside Startup.Configure
method:
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory,
IRethinkDbConnectionFactory connectionFactory)
{
//....
var con = connectionFactory.CreateConnection();
con.CheckOpen();
}
The TokenGen app logic involves two entities: token and issuer. Tokens are issued by an app instance (issuer) to a client by calling the /api/token
endpoint. We will create a database to store these two entries.
First we create a class for each entity:
public class Issuer
{
[JsonProperty("id", NullValueHandling = NullValueHandling.Ignore)]
public string Id { get; set; }
public string Name { get; set; }
public string Version { get; set; }
public DateTime Timestamp { get; set; }
}
public class Token
{
[JsonProperty("id", NullValueHandling = NullValueHandling.Ignore)]
public string Id { get; set; }
public DateTime Expires { get; set; }
public string Issuer { get; set; }
}
Now we will create a service that uses the RethinkDb connection factory and implements the database initialization logic:
public class RethinkDbStore : IRethinkDbStore
{
private static IRethinkDbConnectionFactory _connectionFactory;
private static RethinkDB R = RethinkDB.R;
private string _dbName;
public RethinkDbStore(IRethinkDbConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
_dbName = connectionFactory.GetOptions().Database;
}
public void InitializeDatabase()
{
// database
CreateDb(_dbName);
// tables
CreateTable(_dbName, nameof(Token));
CreateTable(_dbName, nameof(Issuer));
// indexes
CreateIndex(_dbName, nameof(Token), nameof(Token.Issuer));
CreateIndex(_dbName, nameof(Issuer), nameof(Issuer.Name));
}
protected void CreateDb(string dbName)
{
var conn = _connectionFactory.CreateConnection();
var exists = R.DbList().Contains(db => db == dbName).Run(conn);
if (!exists)
{
R.DbCreate(dbName).Run(conn);
R.Db(dbName).Wait_().Run(conn);
}
}
protected void CreateTable(string dbName, string tableName)
{
var conn = _connectionFactory.CreateConnection();
var exists = R.Db(dbName).TableList().Contains(t => t == tableName).Run(conn);
if (!exists)
{
R.Db(dbName).TableCreate(tableName).Run(conn);
R.Db(dbName).Table(tableName).Wait_().Run(conn);
}
}
protected void CreateIndex(string dbName, string tableName, string indexName)
{
var conn = _connectionFactory.CreateConnection();
var exists = R.Db(dbName).Table(tableName).IndexList().Contains(t => t == indexName).Run(conn);
if (!exists)
{
R.Db(dbName).Table(tableName).IndexCreate(indexName).Run(conn);
R.Db(dbName).Table(tableName).IndexWait(indexName).Run(conn);
}
}
public void Reconfigure(int shards, int replicas)
{
var conn = _connectionFactory.CreateConnection();
var tables = R.Db(_dbName).TableList().Run(conn);
foreach (string table in tables)
{
R.Db(_dbName).Table(table).Reconfigure().OptArg("shards", shards).OptArg("replicas", replicas).Run(conn);
R.Db(_dbName).Table(table).Wait_().Run(conn);
}
}
}
The InitializeDatabase
method checks if the TokenStore database, tables and indexes are present on the cluster, if they are missing then it will create them. Because these operations are expensive you should call this method only once at application start.
On application startup, create a singleton instance of IRethinkDbStore
and invoke InitializeDatabase
:
public void ConfigureServices(IServiceCollection services)
{
//...
services.Configure<RethinkDbOptions>(Configuration.GetSection("RethinkDbDev"));
services.AddSingleton<IRethinkDbConnectionFactory, RethinkDbConnectionFactory>();
services.AddSingleton<IRethinkDbStore, RethinkDbStore>();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory,
IRethinkDbStore store)
{
//...
store.InitializeDatabase();
}
When the InitializeDatabase
runs, the tables are created by default with one shard and one replica. We've setup earlier a RethinkDB cluster with 4 nodes, to make our tables fault tolerant we need to replicate our data on all 4 nodes. In order to do this, after the database initialization we can call store.Reconfigure(shards = 1, replicas = 4)
.
When a TokenGen app instance starts it has to register in the database as a issuer, we will use the MachineName
to identity the issuer. When docker starts a new container, the MachineName
get populated with the container ID so we will use this ID to track the load of each app instance.
First we create a method to persist issuers:
public class RethinkDbStore : IRethinkDbStore
{
//..
public string InsertOrUpdateIssuer(Issuer issuer)
{
var conn = _connectionFactory.CreateConnection();
Cursor<Issuer> all = R.Db(_dbName).Table(nameof(Issuer))
.GetAll(issuer.Name)[new { index = nameof(Issuer.Name) }]
.Run<Issuer>(conn);
var issuers = all.ToList();
if (issuers.Count > 0)
{
// update
R.Db(_dbName).Table(nameof(Issuer)).Get(issuers.First().Id).Update(issuer).RunResult(conn);
return issuers.First().Id;
}
else
{
// insert
var result = R.Db(_dbName).Table(nameof(Issuer))
.Insert(issuer)
.RunResult(conn);
return result.GeneratedKeys.First().ToString();
}
}
}
This method uses the Issuer.Name
secondary index we created earlier to search for an issuer by name. If exists, will update the version and timestamp, else will insert a new one.
Note that the update will only happen when you run the app from Visual Studio. On docker swarm, when you scale up a service, each instance has a unique ID.
We can now call this method on application start:
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory,
IRethinkDbStore store)
{
//...
// create TokenStore database, tables and indexes if not exists
store.InitializeDatabase();
// register issuer
store.InsertOrUpdateIssuer(new Issuer
{
Name = Environment.MachineName,
Version = PlatformServices.Default.Application.ApplicationVersion,
Timestamp = DateTime.UtcNow
});
}
Now that we've implemented the issuers persistence we can do the same for the tokens by adding the following method to RethinkDbStore
:
public void InserToken(Token token)
{
var conn = _connectionFactory.CreateConnection();
var result = R.Db(_dbName).Table(nameof(Token))
.Insert(token)
.RunResult(conn);
}
Edit the TokenController
and call InsertToken
before sending the token to the client:
[Route("api/[controller]")]
public class TokenController : Controller
{
private IRethinkDbStore _store;
public TokenController(IRethinkDbStore store)
{
_store = store;
}
[HttpGet]
public Token Get()
{
var token = new Token
{
Id = Guid.NewGuid().ToString(),
Expires = DateTime.UtcNow.AddHours(1),
Issuer = Environment.MachineName
};
_store.InserToken(token);
return token;
}
}
What we've done so far is registering an app instance as issuer at startup and persist generated tokens. Let's proceed further by creating a report with each issuer and the numbers of tokens generated.
First we create a IssuerStatus
model to hold our report data:
public class IssuerStatus
{
public string Name { get; set; }
public string Version { get; set; }
public DateTime RegisterDate { get; set; }
public long TotalTokensIssued { get; set; }
}
Then we we create a method in RethinkDbStore
to build our report:
public List<IssuerStatus> GetIssuerStatus()
{
var conn = _connectionFactory.CreateConnection();
Cursor<Issuer> all = R.Db(_dbName).Table(nameof(Issuer)).RunCursor<Issuer>(conn);
var list = all.OrderByDescending(f => f.Timestamp)
.Select(f => new IssuerStatus
{
Name = f.Name,
RegisterDate = f.Timestamp,
Version = f.Version,
TotalTokensIssued = R.Db(_dbName).Table(nameof(Token))
.GetAll(f.Name)[new { index = nameof(Token.Issuer) }]
.Count()
.Run<long>(conn)
}).ToList();
return list;
}
We are using the Token.Issuer
secondary index to count all tokens belonging to the same issuer.
Copyright Stefan Prodan