Tracing in Node

Let's do some tracing in Node. (I have a post on Tracing in SpringBoot here Tracing is an essential part of a microservice architecture. You need to be able to examine an event lifecycle. Events flow through many microservices, transforming, forking, and merging. In a monolith, we can log the events in one log file and see everything that is going on. In a microservice architecture, we need a way to tie together log messages generated by a unique event. In this example, we will use Zipkin to record the events. Let's get started.

Testbed

To test our tracing, we need to create two microservices, a Web App and a Service App.

The Web App will have two REST endpoints

The Service App will have one REST endpoint and a Kafka consumer

Create the Web App and Service App

Get started

mkdir node-tracing
cd node-tracing
npm init -y
yarn add koa koa-body koa-combine-routers koa-router node-fetch uuidv4 kafkajs

Add these scripts

package.json

"scripts": {
    "dev:webapp": "npx nodemon webapp/server.js",
    "start:webapp": "node webapp/server.js",
    "dev:service": "npx nodemon service/server.js",
    "start:service": "node service/server.js",
    "test": "echo \"Error: no test specified\" && exit 1"
  },

Web App

Create an entry point.

webapp/server.js

const Koa = require('koa')
const router = require('./routes/router')
const bodyParser = require('koa-body')
const { kafkaServer } = require('./messaging/kafka')

const app = new Koa()

app.use(bodyParser())
app.use(router())
kafkaServer.run().catch(console.error)
app.listen(3000)

Add the routes.

webapp/routes/router.js

const TodoRouter = require('./todos')
const combineRouters = require('koa-combine-routers')

const router = combineRouters(TodoRouter)

module.exports = router

webapp/routes/todos.js

const Router = require('koa-router')
const fetch = require('node-fetch')
const url = 'http://localhost:3001/todos'
const { v4: uuidv4 } = require('uuid')
const { producer } = require('../messaging/kafka')

const router = Router({ prefix: '/todos' })

router.get('/', async (ctx) => {
  try {
    const response = await fetch(url)
    const json = await response.json()
    ctx.body = json
  } catch (error) {
    console.log(error)
    ctx.body = 'failure'
  }
})

router.post('/', async (ctx) => {
  const todoMessage = {
    description: ctx.request.body.description,
    transactionId: uuidv4(),
  }
  console.log('hello', todoMessage)
  await producer.send({
    topic: 'todo',
    messages: [{ value: JSON.stringify(todoMessage) }],
  })

  ctx.body = { success: true }
})

module.exports = router

Add Kafka

webapp/messaging/kafka.js

const { Kafka } = require('kafkajs')
const queueName = 'createTodoQueue'
const kafkaBrokerUrl = 'localhost:9092'

const kafka = new Kafka({
    clientId: queueName,
    brokers: [kafkaBrokerUrl],
  }),

const producer = kafka.producer()
const kafkaServer = {
  run: async () => {
  },
}

module.exports = { kafkaServer, producer }

Service App

Create an entry point.

service/server.js

const Koa = require('koa')
const router = require('./routes/router')
const { kafkaServer } = require('./messaging/kafka')

const app = new Koa()

app.use(router())
kafkaServer.run().catch(console.error)

app.listen(3001)

Add the routes.

service/routes/router.js

const KoaRouter = require('koa-router')
const TodoRouter = require('./todos')
const combineRouters = require('koa-combine-routers')

const router = combineRouters(TodoRouter)

module.exports = router

service/routes/todos.js

var Router = require('koa-router')
const todoStore = require('../todoStore')

const router = Router({ prefix: '/todos' })

router.get('/', async (ctx) => {
  ctx.body = todoStore.getTodos()
})

module.exports = router

Add Kafka

service/messaging/kafka.js

const { Kafka } = require('kafkajs')
const todoStore = require('../todoStore')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'],
})

const consumer = kafka.consumer({ groupId: 'group_id' })
const kafkaServer = {
  run: async () => {
    // Consuming
    await consumer.connect()
    await consumer.subscribe({ topic: 'todo', fromBeginning: true })

    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        console.log({
          partition,
          offset: message.offset,
          value: message.value.toString(),
        })
        try {
          const payload = JSON.parse(message.value.toString())
          const newTodo = {
            description: payload.description,
            completed: false,
          }
          todoStore.addTodo(newTodo)
        } catch (error) {
          console.error('Unable to process messasge', message.value.toString())
        }
      },
    })
  },
}

module.exports = { kafkaServer }

Add a Todo store

service/todoStore.js

let todos = [
  { description: 'Get Milk', completed: false },
  { description: 'Get Bread', completed: false },
]
const todoStore = {
  getTodos: () => {
    return todos
  },
  addTodo: (todo) => {
    console.log('createTodos', todo)
    todos.push(todo)
  },
}

module.exports = todoStore

Docker Kafka

Use docker to run Kafka with docker-compose.

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
  kafka:
    image: wurstmeister/kafka
    ports:
      - '9092:9092'
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Test it

We should be able to run and the Web App and Service App.

docker-compose up
yarn dev:webapp
yarn dev:service
curl --request POST \
  --url http://localhost:3000/todos \
  --header 'content-type: application/json' \
  --data '{"description":"remember this"}'

Add Zipkin

We are finally ready to start to add tracing. We need these dependencies.

yarn add zipkin-context-cls zipkin-transport-http zipkin-instrumentation-fetch zipkin-instrumentation-kafkajs zipkin-instrumentation-koa

Create Tracers.

Tracers send traces events to Zipkin Note that in the Web App we wrap fetch as zipkinServiceFetch

/webapp/tracer.js

const {
  Tracer,
  BatchRecorder,
  jsonEncoder: { JSON_V2 },
} = require('zipkin')
const CLSContext = require('zipkin-context-cls')
const { HttpLogger } = require('zipkin-transport-http')
const fetch = require('node-fetch')
const wrapFetch = require('zipkin-instrumentation-fetch')
const serviceName = 'my-webapp'

const ctxImpl = new CLSContext('zipkin')
const recorder = new BatchRecorder({
  logger: new HttpLogger({
    endpoint: 'http://localhost:9411/api/v2/spans',
    jsonEncoder: JSON_V2,
  }),
})

const tracer = new Tracer({
  recorder,
  ctxImpl,
  localServiceName: serviceName,
})

const remoteServiceName = 'my-service'
const zipkinServiceFetch = wrapFetch(fetch, { tracer, remoteServiceName })

module.exports = { tracer, serviceName, zipkinServiceFetch }

/service/tracer.js

const {
  Tracer,
  BatchRecorder,
  jsonEncoder: { JSON_V2 },
} = require('zipkin')
const CLSContext = require('zipkin-context-cls')
const { HttpLogger } = require('zipkin-transport-http')
const serviceName = 'my-service'

const ctxImpl = new CLSContext('zipkin')
const recorder = new BatchRecorder({
  logger: new HttpLogger({
    endpoint: 'http://localhost:9411/api/v2/spans',
    jsonEncoder: JSON_V2,
  }),
})

const tracer = new Tracer({
  recorder,
  ctxImpl,
  localServiceName: serviceName,
})

module.exports = { tracer, serviceName }

Connect the tracers with Koa middleware.

/webapp/server.js

const Koa = require('koa')
const router = require('./routes/router')
const bodyParser = require('koa-body')
const { koaMiddleware } = require('zipkin-instrumentation-koa')
const { kafkaServer } = require('./messaging/kafka')
const { tracer, serviceName } = require('./tracer')

const app = new Koa()

app.use(bodyParser())
app.use(koaMiddleware({ tracer, serviceName }))
app.use(router())
kafkaServer.run().catch(console.error)
app.listen(3000)

/service/server.js

const Koa = require('koa')
const router = require('./routes/router')
const { koaMiddleware } = require('zipkin-instrumentation-koa')
const { kafkaServer } = require('./messaging/kafka')
const { tracer, serviceName } = require('./tracer')

const app = new Koa()

app.use(koaMiddleware({ tracer, serviceName }))
app.use(router())
kafkaServer.run().catch(console.error)

app.listen(3001)

Update the Web App routes and Kafka.

In the Todo route, we replace the fetch implementation with an instrumented one.

webapp/routes/todo.js

var Router = require('koa-router')
const { zipkinServiceFetch: fetch } = require('../tracer')
const url = 'http://localhost:3001/todos'
const { v4: uuidv4 } = require('uuid')
const { producer } = require('../messaging/kafka')

const router = Router({ prefix: '/todos' })

router.get('/', async (ctx) => {
  try {
    const response = await fetch(url)
    const json = await response.json()
    ctx.body = json
  } catch (error) {
    console.log(error)
    ctx.body = 'failure'
  }
})

router.post('/', async (ctx) => {
  const todoMessage = {
    description: ctx.request.body.description,
    transactionId: uuidv4(),
  }
  console.log('hello', todoMessage)
  await producer.send({
    topic: 'todo',
    messages: [{ value: JSON.stringify(todoMessage) }],
  })

  ctx.body = { success: true }
})

module.exports = router

For Kafka, we also need to replace it with an instrumented one.

webapp/messaging/kafka.js

const { Kafka } = require('kafkajs')
const instrumentKafkaJs = require('zipkin-instrumentation-kafkajs')
const { tracer } = require('../tracer')

const kafka = instrumentKafkaJs(
  new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:9092'],
  }),
  {
    tracer, // Your zipkin tracer instance
    remoteServiceName: 'my-app', // This should be the symbolic name of the broker, not a consumer.
  }
)

const producer = kafka.producer()
const kafkaServer = {
  run: async () => {},
}

module.exports = { kafkaServer, producer }

Update the Service App Kafka. For Kafka, we also need to replace it with an instrumented one.

service/messaging/kafka.js

const { Kafka } = require('kafkajs')
const todoStore = require('../todoStore')
const instrumentKafkaJs = require('zipkin-instrumentation-kafkajs')
const { tracer } = require('../tracer')

const kafka = instrumentKafkaJs(
  new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:9092'],
  }),
  {
    tracer, // Your zipkin tracer instance
    remoteServiceName: 'serviceName', // This should be the symbolic name of the broker, not a consumer.
  }
)

const consumer = kafka.consumer({ groupId: 'group_id' })
const kafkaServer = {
  run: async () => {
    // Consuming
    await consumer.connect()
    await consumer.subscribe({ topic: 'todo', fromBeginning: true })

    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        console.log({
          partition,
          offset: message.offset,
          value: message.value.toString(),
        })
        try {
          const payload = JSON.parse(message.value.toString())
          const newTodo = {
            description: payload.description,
            completed: false,
          }
          todoStore.addTodo(newTodo)
        } catch (error) {
          console.error('Unable to process messasge', message.value.toString())
        }
      },
    })
  },
}

module.exports = { kafkaServer }

Add Zipkin server

Update docker-compose to include Zipkin

docker-compose.ml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
  kafka:
    image: wurstmeister/kafka
    ports:
      - '9092:9092'
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  zipkin:
    image: openzipkin/zipkin
    container_name: zipkin
    ports:
      - '9411:9411'

Test it

We should be able to run and the Web App and Service App.

docker-compose up
yarn dev:webapp
yarn dev:service
curl --request POST \
  --url http://localhost:3000/todos \
  --header 'content-type: application/json' \
  --data '{"description":"remember this"}'

Copyright © 2020 Code Green LLC