Let’s do some tracing in Node. (I have a post on Tracing in SpringBoot here Tracing is an essential part of a microservice architecture. You need to be able to examine an event lifecycle. Events flow through many microservices, transforming, forking, and merging. In a monolith, we can log the events in one log file and see everything that is going on. In a microservice architecture, we need a way to tie together log messages generated by a unique event. In this example, we will use Zipkin to record the events. Let’s get started.
To test our tracing, we need to create two microservices, a Web App and a Service App.
The Web App will have two REST endpoints
The Service App will have one REST endpoint and a Kafka consumer
Get started
mkdir node-tracing
cd node-tracing
npm init -y
yarn add koa koa-body koa-combine-routers koa-router node-fetch uuidv4 kafkajs
Add these scripts
package.json
"scripts": {
"dev:webapp": "npx nodemon webapp/server.js",
"start:webapp": "node webapp/server.js",
"dev:service": "npx nodemon service/server.js",
"start:service": "node service/server.js",
"test": "echo \"Error: no test specified\" && exit 1"
},
Create an entry point.
webapp/server.js
const Koa = require('koa')
const router = require('./routes/router')
const bodyParser = require('koa-body')
const {
kafkaServer
} = require('./messaging/kafka')
const app = new Koa()
app.use(bodyParser())
app.use(router())
kafkaServer.run().catch(console.error)
app.listen(3000)
Add the routes.
webapp/routes/router.js
const TodoRouter = require('./todos')
const combineRouters = require('koa-combine-routers')
const router = combineRouters(TodoRouter)
module.exports = router
webapp/routes/todos.js
const Router = require('koa-router')
const fetch = require('node-fetch')
const url = 'http://localhost:3001/todos'
const {
v4: uuidv4
} = require('uuid')
const {
producer
} = require('../messaging/kafka')
const router = Router({
prefix: '/todos'
})
router.get('/', async (ctx) => {
try {
const response = await fetch(url)
const json = await response.json()
ctx.body = json
} catch (error) {
console.log(error)
ctx.body = 'failure'
}
})
router.post('/', async (ctx) => {
const todoMessage = {
description: ctx.request.body.description,
transactionId: uuidv4(),
}
console.log('hello', todoMessage)
await producer.send({
topic: 'todo',
messages: [{
value: JSON.stringify(todoMessage)
}],
})
ctx.body = {
success: true
}
})
module.exports = router
Add Kafka
webapp/messaging/kafka.js
const {
Kafka
} = require('kafkajs')
const queueName = 'createTodoQueue'
const kafkaBrokerUrl = 'localhost:9092'
const kafka = new Kafka({
clientId: queueName,
brokers: [kafkaBrokerUrl],
}),
const producer = kafka.producer()
const kafkaServer = {
run: async () => {},
}
module.exports = {
kafkaServer,
producer
}
Create an entry point.
service/server.js
const Koa = require('koa')
const router = require('./routes/router')
const {
kafkaServer
} = require('./messaging/kafka')
const app = new Koa()
app.use(router())
kafkaServer.run().catch(console.error)
app.listen(3001)
Add the routes.
service/routes/router.js
const KoaRouter = require('koa-router')
const TodoRouter = require('./todos')
const combineRouters = require('koa-combine-routers')
const router = combineRouters(TodoRouter)
module.exports = router
service/routes/todos.js
var Router = require('koa-router')
const todoStore = require('../todoStore')
const router = Router({
prefix: '/todos'
})
router.get('/', async (ctx) => {
ctx.body = todoStore.getTodos()
})
module.exports = router
Add Kafka
service/messaging/kafka.js
const {
Kafka
} = require('kafkajs')
const todoStore = require('../todoStore')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
})
const consumer = kafka.consumer({
groupId: 'group_id'
})
const kafkaServer = {
run: async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({
topic: 'todo',
fromBeginning: true
})
await consumer.run({
eachMessage: async ({
topic,
partition,
message
}) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
try {
const payload = JSON.parse(message.value.toString())
const newTodo = {
description: payload.description,
completed: false,
}
todoStore.addTodo(newTodo)
} catch (error) {
console.error('Unable to process messasge', message.value.toString())
}
},
})
},
}
module.exports = {
kafkaServer
}
Add a Todo store
service/todoStore.js
let todos = [{
description: 'Get Milk',
completed: false
},
{
description: 'Get Bread',
completed: false
},
]
const todoStore = {
getTodos: () => {
return todos
},
addTodo: (todo) => {
console.log('createTodos', todo)
todos.push(todo)
},
}
module.exports = todoStore
Use docker to run Kafka with docker-compose.
docker-compose.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
kafka:
image: wurstmeister/kafka
ports:
- '9092:9092'
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
We should be able to run and the Web App and Service App.
docker-compose up
yarn dev:webapp
yarn dev:service
curl --request POST \
--url http://localhost:3000/todos \
--header 'content-type: application/json' \
--data '{"description":"remember this"}'
We are finally ready to start to add tracing. We need these dependencies.
yarn add zipkin-context-cls zipkin-transport-http zipkin-instrumentation-fetch zipkin-instrumentation-kafkajs zipkin-instrumentation-koa
Tracers send traces events to Zipkin Note that in the Web App we wrap fetch as zipkinServiceFetch
/webapp/tracer.js
const {
Tracer,
BatchRecorder,
jsonEncoder: {
JSON_V2
},
} = require('zipkin')
const CLSContext = require('zipkin-context-cls')
const {
HttpLogger
} = require('zipkin-transport-http')
const fetch = require('node-fetch')
const wrapFetch = require('zipkin-instrumentation-fetch')
const serviceName = 'my-webapp'
const ctxImpl = new CLSContext('zipkin')
const recorder = new BatchRecorder({
logger: new HttpLogger({
endpoint: 'http://localhost:9411/api/v2/spans',
jsonEncoder: JSON_V2,
}),
})
const tracer = new Tracer({
recorder,
ctxImpl,
localServiceName: serviceName,
})
const remoteServiceName = 'my-service'
const zipkinServiceFetch = wrapFetch(fetch, {
tracer,
remoteServiceName
})
module.exports = {
tracer,
serviceName,
zipkinServiceFetch
}
/service/tracer.js
const {
Tracer,
BatchRecorder,
jsonEncoder: {
JSON_V2
},
} = require('zipkin')
const CLSContext = require('zipkin-context-cls')
const {
HttpLogger
} = require('zipkin-transport-http')
const serviceName = 'my-service'
const ctxImpl = new CLSContext('zipkin')
const recorder = new BatchRecorder({
logger: new HttpLogger({
endpoint: 'http://localhost:9411/api/v2/spans',
jsonEncoder: JSON_V2,
}),
})
const tracer = new Tracer({
recorder,
ctxImpl,
localServiceName: serviceName,
})
module.exports = {
tracer,
serviceName
}
Connect the tracers with Koa middleware.
/webapp/server.js
const Koa = require('koa')
const router = require('./routes/router')
const bodyParser = require('koa-body')
const {
koaMiddleware
} = require('zipkin-instrumentation-koa')
const {
kafkaServer
} = require('./messaging/kafka')
const {
tracer,
serviceName
} = require('./tracer')
const app = new Koa()
app.use(bodyParser())
app.use(koaMiddleware({
tracer,
serviceName
}))
app.use(router())
kafkaServer.run().catch(console.error)
app.listen(3000)
/service/server.js
const Koa = require('koa')
const router = require('./routes/router')
const {
koaMiddleware
} = require('zipkin-instrumentation-koa')
const {
kafkaServer
} = require('./messaging/kafka')
const {
tracer,
serviceName
} = require('./tracer')
const app = new Koa()
app.use(koaMiddleware({
tracer,
serviceName
}))
app.use(router())
kafkaServer.run().catch(console.error)
app.listen(3001)
Update the Web App routes and Kafka.
In the Todo route, we replace the fetch implementation with an instrumented one.
webapp/routes/todo.js
var Router = require('koa-router')
const {
zipkinServiceFetch: fetch
} = require('../tracer')
const url = 'http://localhost:3001/todos'
const {
v4: uuidv4
} = require('uuid')
const {
producer
} = require('../messaging/kafka')
const router = Router({
prefix: '/todos'
})
router.get('/', async (ctx) => {
try {
const response = await fetch(url)
const json = await response.json()
ctx.body = json
} catch (error) {
console.log(error)
ctx.body = 'failure'
}
})
router.post('/', async (ctx) => {
const todoMessage = {
description: ctx.request.body.description,
transactionId: uuidv4(),
}
console.log('hello', todoMessage)
await producer.send({
topic: 'todo',
messages: [{
value: JSON.stringify(todoMessage)
}],
})
ctx.body = {
success: true
}
})
module.exports = router
For Kafka, we also need to replace it with an instrumented one.
webapp/messaging/kafka.js
const {
Kafka
} = require('kafkajs')
const instrumentKafkaJs = require('zipkin-instrumentation-kafkajs')
const {
tracer
} = require('../tracer')
const kafka = instrumentKafkaJs(
new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
}), {
tracer, // Your zipkin tracer instance
remoteServiceName: 'my-app', // This should be the symbolic name of the broker, not a consumer.
}
)
const producer = kafka.producer()
const kafkaServer = {
run: async () => {},
}
module.exports = {
kafkaServer,
producer
}
Update the Service App Kafka. For Kafka, we also need to replace it with an instrumented one.
service/messaging/kafka.js
const {
Kafka
} = require('kafkajs')
const todoStore = require('../todoStore')
const instrumentKafkaJs = require('zipkin-instrumentation-kafkajs')
const {
tracer
} = require('../tracer')
const kafka = instrumentKafkaJs(
new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
}), {
tracer, // Your zipkin tracer instance
remoteServiceName: 'serviceName', // This should be the symbolic name of the broker, not a consumer.
}
)
const consumer = kafka.consumer({
groupId: 'group_id'
})
const kafkaServer = {
run: async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({
topic: 'todo',
fromBeginning: true
})
await consumer.run({
eachMessage: async ({
topic,
partition,
message
}) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
try {
const payload = JSON.parse(message.value.toString())
const newTodo = {
description: payload.description,
completed: false,
}
todoStore.addTodo(newTodo)
} catch (error) {
console.error('Unable to process messasge', message.value.toString())
}
},
})
},
}
module.exports = {
kafkaServer
}
Update docker-compose to include Zipkin
docker-compose.ml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
kafka:
image: wurstmeister/kafka
ports:
- '9092:9092'
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
zipkin:
image: openzipkin/zipkin
container_name: zipkin
ports:
- '9411:9411'
We should be able to run and the Web App and Service App.
docker-compose up
yarn dev:webapp
yarn dev:service
curl --request POST \
--url http://localhost:3000/todos \
--header 'content-type: application/json' \
--data '{"description":"remember this"}'