Introduction
In this article, we'll learn how to stream data to clients with SignalR using ASP.NET Core and Angular 5. We will go through the channel reader /writer which helps in reading/writing into a channel. The channels play a vital role in streaming data using SignalR. Streaming data is the type of consumer /producer pattern. Let's say the producer is writing data to a channel and the consumer is reading data from the channel. You can choose how to write and read the data from channels so it is memory efficient and high performance. Let us say you are producing data on the channel but somehow a consumer is no longer available to consume the data. In such a case, the memory will increase so you can restrict the channel context to be bound to some limit. Will discuss each of the channels in details below.
The channels were introduced in DOTNET Core 2.1 in "System.Threading.Channels" namespace under the CoreFX (https://github.com/dotnet/corefx) project. The channels are better than data flow and pipeline stream. They are more elegant performers in consumer producer queue patterns and with simple and powerful APIs.
This article is a part of the series on SignalR using ASPNET Core.
- Overview of New Stack SignalR on ASPNET Core here
- Getting Started With SignalR Using ASP.NET Core And Angular 5 here
- Working with Azure service
- Working with xamarin
- Working with dynamic hubs
This article demonstrates the following.
- Channel reader/ writer
- Bounded / unbounded/Un Buffered Channels
- Creating angular service to read the stream
- Streaming Demo
Prerequisite
You must have the following software,
The source code is available at GitHub.
Channels
The System.Threading.Tasks.Channels library provides a set of synchronization data structures for passing the data between producers and consumers. Whereas the existing System.Threading.Tasks.Dataflow library is focused on pipelining and connecting together dataflow "blocks" which encapsulates both storage and processing. System.Threading.Tasks.Channels is focused purely on the storage aspect with data structures used to provide the hand-offs between the participants explicitly coded to use the storage. The library is designed to be used with async/await in C#.
Core channels
- Unbound Channel
Used to create a buffered, unbound channel. The channel may be used concurrently by any number of readers and writers, and is unbounded in size, limited only by available memory.
- Bound channel
Used to create a buffered channel that stores at most a specified number of items. The channel may be used concurrently by any number of reads and writers.
- UnBuffered Channel
Used to create an unbuffered channel. Such a channel has no internal storage for T items, instead, it has the ability to pairing up writers and readers to rendezvous and directly hand off their data from one to the other. TryWrite operations will only succeed if there's currently an outstanding ReadAsync operation, and conversely, TryRead operations will only succeed if there's currently an outstanding WriteAsync operation.
At the end of this article , you will be able to achieve the below demo.
If you haven't read about "How to get started with SignalR using ASPNET Core and Angular 5",
click here.
In the previous article , we have learned how to create Angular SPA template and we have also seen the start up configuration and npm package installation. The purpose of this article is to work with streaming data in SignalR.
SignalR Startup Configuration
You need to configure the SignalR service in "Configure Service" section and Map the hub in configure section. It automatically adds the SignalR reference for you.
Add SignalR service in Configure service method
-
- public void ConfigureServices(IServiceCollection services)
- {
- services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
-
-
- services.AddSpaStaticFiles(configuration =>
- {
- configuration.RootPath = "ClientApp/dist";
- });
-
- services.AddSignalR();
-
- services.AddSingleton<StockTicker>();
- }
Configure Method
- app.UseSignalR(route =>
- {
- route.MapHub<ChatHub>("/chathub");
- route.MapHub<StockTickerHub>("/stock");
- });
After configuring SignalR in Startup we need to install SignalR Javascript library using NPM Package Manager. Run the below command in Command Prompt.
Command
- npm init -y
- npm install @aspnet/signalr
- npm init to initialize or set the project npm package. -y | --yes to skip the questionnaire.
You have to run the above commands in Package manager console (Tools --> Nuget Package Manager --> Package Manager Console)
Now, we are going to create a separate hub for data streaming so I'm going to create a stock ticker hub. The Hub is taking care of communication between client and server.
Creating Stock Hub
We are going to create stock hub class which inherits hub class. The hub has clients API to talk with clients so you can easily invoke. I've created a separate class for stock ticker which is take care about the list of stock and streaming data.
-
-
-
- public class StockTickerHub : Hub
- {
- private StockTicker _stockTicker;
-
- public StockTickerHub(StockTicker stockTicker)
- {
- this._stockTicker = stockTicker;
- }
-
- public IEnumerable<Stock> GetAllStocks()
- {
- return _stockTicker.GetAllStocks();
- }
-
- public ChannelReader<Stock> StreamStocks()
- {
- return _stockTicker.StreamStocks().AsChannelReader(10);
- }
-
- public string GetMarketState()
- {
- return _stockTicker.MarketState.ToString();
- }
-
- public async Task OpenMarket()
- {
- await _stockTicker.OpenMarket();
- }
-
- public async Task CloseMarket()
- {
- await _stockTicker.CloseMarket();
- }
-
- public async Task Reset()
- {
- await _stockTicker.Reset();
- }
- }
Stock Ticker Class
Which is taking care about loading stocks and writing data to stream
- public class StockTicker
- {
- private readonly SemaphoreSlim _marketStateLock = new SemaphoreSlim(1, 1);
- private readonly SemaphoreSlim _updateStockPricesLock = new SemaphoreSlim(1, 1);
-
- private readonly ConcurrentDictionary<string, Stock> _stocks = new ConcurrentDictionary<string, Stock>();
-
- private readonly Subject<Stock> _subject = new Subject<Stock>();
-
-
- private readonly double _rangePercent = 0.002;
-
- private readonly TimeSpan _updateInterval = TimeSpan.FromMilliseconds(250);
- private readonly Random _updateOrNotRandom = new Random();
-
- private Timer _timer;
- private volatile bool _updatingStockPrices;
- private volatile MarketState _marketState;
-
- public StockTicker(IHubContext<StockTickerHub> hub)
- {
- Hub = hub;
- LoadDefaultStocks();
- }
-
- private IHubContext<StockTickerHub> Hub
- {
- get;
- set;
- }
-
- public MarketState MarketState
- {
- get { return _marketState; }
- private set { _marketState = value; }
- }
-
- public IEnumerable<Stock> GetAllStocks()
- {
- return _stocks.Values;
- }
-
- public IObservable<Stock> StreamStocks()
- {
- return _subject;
- }
-
- public async Task OpenMarket()
- {
- await _marketStateLock.WaitAsync();
- try
- {
- if (MarketState != MarketState.Open)
- {
- _timer = new Timer(UpdateStockPrices, null, _updateInterval, _updateInterval);
-
- MarketState = MarketState.Open;
-
- await BroadcastMarketStateChange(MarketState.Open);
- }
- }
- finally
- {
- _marketStateLock.Release();
- }
- }
-
- public async Task CloseMarket()
- {
- await _marketStateLock.WaitAsync();
- try
- {
- if (MarketState == MarketState.Open)
- {
- if (_timer != null)
- {
- _timer.Dispose();
- }
-
- MarketState = MarketState.Closed;
-
- await BroadcastMarketStateChange(MarketState.Closed);
- }
- }
- finally
- {
- _marketStateLock.Release();
- }
- }
-
- public async Task Reset()
- {
- await _marketStateLock.WaitAsync();
- try
- {
- if (MarketState != MarketState.Closed)
- {
- throw new InvalidOperationException("Market must be closed before it can be reset.");
- }
-
- LoadDefaultStocks();
- await BroadcastMarketReset();
- }
- finally
- {
- _marketStateLock.Release();
- }
- }
-
- private void LoadDefaultStocks()
- {
- _stocks.Clear();
-
- var stocks = new List<Stock>
- {
- new Stock { Symbol = "HDFC Bank", Price = 2049.35m },
- new Stock { Symbol = "Bharti Airtel", Price = 377.55m },
- new Stock { Symbol = "SBI", Price = 273.00m },
- new Stock { Symbol = "Reliance", Price = 984.35m }
- };
-
- stocks.ForEach(stock => _stocks.TryAdd(stock.Symbol, stock));
- }
-
- private async void UpdateStockPrices(object state)
- {
-
- await _updateStockPricesLock.WaitAsync();
- try
- {
- if (!_updatingStockPrices)
- {
- _updatingStockPrices = true;
-
- foreach (var stock in _stocks.Values)
- {
- TryUpdateStockPrice(stock);
-
- _subject.OnNext(stock);
- }
-
- _updatingStockPrices = false;
- }
- }
- finally
- {
- _updateStockPricesLock.Release();
- }
- }
-
- private bool TryUpdateStockPrice(Stock stock)
- {
-
- var r = _updateOrNotRandom.NextDouble();
- if (r > 0.1)
- {
- return false;
- }
-
-
- var random = new Random((int)Math.Floor(stock.Price));
- var percentChange = random.NextDouble() * _rangePercent;
- var pos = random.NextDouble() > 0.51;
- var change = Math.Round(stock.Price * (decimal)percentChange, 2);
- change = pos ? change : -change;
-
- stock.Price += change;
- return true;
- }
-
- private async Task BroadcastMarketStateChange(MarketState marketState)
- {
- switch (marketState)
- {
- case MarketState.Open:
- await Hub.Clients.All.SendAsync("marketOpened");
- break;
- case MarketState.Closed:
- await Hub.Clients.All.SendAsync("marketClosed");
- break;
- default:
- break;
- }
- }
-
- private async Task BroadcastMarketReset()
- {
- await Hub.Clients.All.SendAsync("marketReset");
- }
- }
-
- public enum MarketState
- {
- Closed,
- Open
- }
Creating Stock Angular Service
Now, I'm going to create stock signalR service for the Angular client to subscribe the stream and call any method. This service has three components.
- Create Connection - take care about creating connection with hub
- Register Server Events - registering on Events like receive message
- Start the connection and subscribe for stream
- import { EventEmitter, Injectable } from '@angular/core';
- import { HubConnection,HubConnectionBuilder, IStreamResult } from '@aspnet/signalr'
-
-
- @Injectable()
- export class stockSignalRService {
- connectionEstablished = new EventEmitter<Boolean>();
- marketOpened = new EventEmitter<Boolean>();
- marketClosed = new EventEmitter<Boolean>();
-
- private connectionIsEstablished = false;
- private _stockHubConnection: HubConnection;
-
-
- constructor() {
- this.createConnection();
- this.registerOnServerEvents();
- this.startConnection();
- }
-
- private createConnection() {
- this._stockHubConnection = new HubConnectionBuilder()
- .withUrl('/stock')
- .build();
- }
-
- private startConnection(): void {
- this._stockHubConnection
- .start()
- .then(() => {
- this.connectionIsEstablished = true;
- console.log('stock connection started');
- this.connectionEstablished.emit(true);
- }).catch(err => {
- setTimeout(this.startConnection(), 5000);
- });
- }
-
- private registerOnServerEvents(): void {
- this._stockHubConnection.on("marketOpened", () => {
- console.log("marketOpened");
- this.marketOpened.emit(true);
- });
-
- this._stockHubConnection.on("marketClosed",() => {
- console.log("marketClosed");
- this.marketClosed.emit(true);
- });
- }
-
- public startStreaming(): IStreamResult<any> {
- return this._stockHubConnection.stream("StreamStocks");
- }
-
- public getAllStocks(): Promise<any> {
- return this._stockHubConnection.invoke("getAllStocks");
- }
-
- public openMarket() {
- this._stockHubConnection.invoke("OpenMarket");
- }
-
- public CloseMarket() {
- this._stockHubConnection.invoke("CloseMarket");
- }
-
- public ResetMarket() {
- this._stockHubConnection.invoke("Reset");
- }
-
- }
After creating the service, we have to register with the provider to access in components. Either register for a global provider in app module or for specific one at self.
Stock Component
The Component takes care of rendring the stream data.
- import { Component } from "@angular/core";
-
- import { stockSignalRService } from "../services/stock.signalR.service";
- import { forEach } from "@angular/router/src/utils/collection";
-
-
- @Component({
- templateUrl: './stock.component.html',
- selector:"app-stock"
- })
-
- export class StockComponent {
-
- stocks = [];
- marketStatus: string;
-
- constructor(private stockService: stockSignalRService) {
- this.stocks = [];
- this.marketStatus = 'closed';
-
-
- stockService.connectionEstablished.subscribe(() => {
- stockService.getAllStocks().then((data) => {
- this.stocks = data;
- });
- });
-
-
- stockService.marketOpened.subscribe(() => {
- this.marketStatus = 'open';
- this.startStrearming();
- });
-
-
- stockService.marketClosed.subscribe(() => {
- this.marketStatus = 'closed';
- });
-
- }
-
- openMarketClicked() {
- this.stockService.openMarket();
- }
-
- startStrearming() {
- this.stockService.startStreaming().subscribe({
- next: (data) => {
- this.displayStock(data);
- },
- error: function (err) {
- console.log('Error:' + err);
- },
- complete: function () {
- console.log('completed');
- }
- });
- }
-
- closeMarketClicked() {
- this.stockService.CloseMarket();
- }
-
- resetClicked() {
- this.stockService.ResetMarket();
- }
-
- displayStock(stock) {
- console.log("stock updated:" + stock.symbol);
- for (let i in this.stocks) {
-
- if (this.stocks[i].symbol == stock.symbol) {
- this.stocks[i] = stock;
- }
- }
- }
-
- }
stock.component.html
- <div>
- <br />
- <button name="openmarket" class="btn btn-primary" (click)="openMarketClicked()">Open Market</button>
- <button name="" class="btn btn-danger" (click)="closeMarketClicked()">Close Market</button>
- <button name="" class="btn btn-default" (click)="resetClicked()">Reset</button>
- <p>Stock Count : {{stocks.length}}</p>
- <p>Market Status : {{marketStatus}}</p>
-
- <table class="table table-response table-bordered">
- <thead>
- <th>Symbol</th>
- <th>Price</th>
- <th>Change %</th>
- </thead>
- <tr *ngFor="let stock of stocks">
- <td>{{stock.symbol}}</td>
- <td>{{stock.price}}</td>
- <td>{{stock.percentChange}} {{stock.percentChange >= 0 ? '▲' : '▼'}}</td>
- </tr>
- </table>
- </div>
Finally, we are finished with the streaming demo with SignalR using ASPNET Core and Angular 5.
Summary
In this article, we have seen the working demo of Streaming data with SignalR using ASP.NET Core and Angular 5. We have learned the following -
- EventEmmiter: to emit the custom event. Read more here
- HubConnection and HubConnectionBuilder : to represent SignalR hub connection and a builder for configuring HubConnection instances.
- C# Channels : consumer / Producers based high performant channels.
References