Introduction
NATS JetStream is a data streaming feature built on top of the core NATS messaging system. It provides enhanced features for building scalable and durable data streaming applications. Here's a step-by-step introduction to how to implement NATS JetStream
Install NATS Server
We have quite different options to install NATS; please follow the few options.
- Docker approach (https://docs.nats.io/running-a-nats-service/introduction/installation#installing-via-docker)
- Download the executable file and run it on the local machine (https://github.com/nats-io/nats-server/releases/)
- Install from Source (go install github.com/nats-io/natscli/nats@latest)
In this article, we are going to look at installing from the source approach. However, the rest options need to be followed the same for further development.
1. Start NATS Server and Enable JetStream Option
- Once you downloaded nats-server-v2.10.7-darwin-arm64.zip, please extract it and execute the "nats-server.exe" in the new command window.
- Please use this command to run the NATS
2. Connect NATS from Code
func JetStreamInit() (nats.JetStreamContext, error) {
// Connect to NATS
log.Printf("Connect to NATS")
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return nil, err
}
log.Printf("Connected to NATS")
// Create JetStream Context
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
return nil, err
}
// Create a stream if it does not exist
// please note we have an option to create this in NATS CLI
// please play with "NATS Cheat" command
err = CreateStream(js)
if err != nil {
return nil, err
}
return js, nil
}
3. Create a Stream
In our proof of concept, Please find our stream and subject name configuration.
package config
const (
StreamName = "MESSAGE"
StreamSubjects = "MESSAGE.*"
SubjectNameRequested = "MESSAGE.requested"
SubjectNameResponded = "MESSAGE.responded"
)
Code Explanation
- The Stream name acts as a root, and the subject will act as a child node.
- I created a few more subjects, such as requested, and responded. This use case is subscriber can get the message according to their subscription.
func CreateStream(jetStream nats.JetStreamContext) error {
stream, err := jetStream.StreamInfo(config.StreamName)
// stream not found, create it
if stream == nil {
log.Printf("Creating stream: %s\n", config.StreamName)
_, err = jetStream.AddStream(&nats.StreamConfig{
Name: config.StreamName,
Subjects: []string{config.StreamSubjects},
})
if err != nil {
return err
}
}
return nil
}
4. Publish Messages
Now we have Stream and Subject. It's time to publish the message to JetStream.
func publish(js nats.JetStreamContext) {
log.Printf("publish")
err := js.Publish(config.SubjectNameRequested, "Requested Msg")
if err != nil {
log.Println(err)
} else {
log.Printf("Publishing => Message")
}
}
5. Create Consumers
func consume(js nats.JetStreamContext) {
_, err := js.Subscribe(config.SubjectNameRequested, func(m *nats.Msg) {
err := m.Ack()
if err != nil {
log.Println("Unable to Ack", err)
return
}
log.Printf("Subscriber / Consumer => Subject: %s \n", m.Subject, m.Data)
},nats.ManualAck())
if err != nil {
log.Println("Subscribe failed")
return
}
}
6. Acknowledge Messages
Streams support acknowledges receiving a message. If you send a Request() to a subject covered by the configuration of the Stream, the service will reply to you once it stores the message. If you just publish, it will not. A Stream can be set to disable Acknowledgements by setting NoAck it to true in its configuration.
7. Build and Run
Finally, build and run.
Reference
Conclusion
Implementing NATS JetStream involves configuring and enabling JetStream in the NATS Server, creating streams for durable message storage, publishing messages to defined streams, and setting up consumers to efficiently consume and acknowledge messages. With additional features like acknowledgment modes and consumer rate limiting, NATS JetStream provides a robust foundation for building scalable and resilient data streaming applications. Explore the documentation for more advanced configurations and tailor the implementation to meet specific use case requirements.