logo My Digital Garden

Tracing in Node

By James Kolean on Sep 19, 2020
Source repository: https://gitlab.com/jameskolean/tracing-node
MicroserviceDockerNodeJSJavaScriptReact
banner

Let’s do some tracing in Node. (I have a post on Tracing in SpringBoot here Tracing is an essential part of a microservice architecture. You need to be able to examine an event lifecycle. Events flow through many microservices, transforming, forking, and merging. In a monolith, we can log the events in one log file and see everything that is going on. In a microservice architecture, we need a way to tie together log messages generated by a unique event. In this example, we will use Zipkin to record the events. Let’s get started.

Testbed

To test our tracing, we need to create two microservices, a Web App and a Service App.

The Web App will have two REST endpoints

  • GET http://localhost:3000/todos returns a list of Todos via a call to the Service App
  • POST http://localhost:3000/todos takes a payload of {“description”: “remember something”} and adds the todo by placing a message in a Kafka queue that the Service App consumes.

The Service App will have one REST endpoint and a Kafka consumer

Create the Web App and Service App

Get started

mkdir node-tracing
cd node-tracing
npm init -y
yarn add koa koa-body koa-combine-routers koa-router node-fetch uuidv4 kafkajs

Add these scripts

package.json

"scripts": {
    "dev:webapp": "npx nodemon webapp/server.js",
    "start:webapp": "node webapp/server.js",
    "dev:service": "npx nodemon service/server.js",
    "start:service": "node service/server.js",
    "test": "echo \"Error: no test specified\" && exit 1"
  },

Web App

Create an entry point.

webapp/server.js

const Koa = require('koa')
const router = require('./routes/router')
const bodyParser = require('koa-body')
const {
    kafkaServer
} = require('./messaging/kafka')

const app = new Koa()

app.use(bodyParser())
app.use(router())
kafkaServer.run().catch(console.error)
app.listen(3000)

Add the routes.

webapp/routes/router.js

const TodoRouter = require('./todos')
const combineRouters = require('koa-combine-routers')

const router = combineRouters(TodoRouter)

module.exports = router

webapp/routes/todos.js

const Router = require('koa-router')
const fetch = require('node-fetch')
const url = 'http://localhost:3001/todos'
const {
    v4: uuidv4
} = require('uuid')
const {
    producer
} = require('../messaging/kafka')

const router = Router({
    prefix: '/todos'
})

router.get('/', async (ctx) => {
    try {
        const response = await fetch(url)
        const json = await response.json()
        ctx.body = json
    } catch (error) {
        console.log(error)
        ctx.body = 'failure'
    }
})

router.post('/', async (ctx) => {
    const todoMessage = {
        description: ctx.request.body.description,
        transactionId: uuidv4(),
    }
    console.log('hello', todoMessage)
    await producer.send({
        topic: 'todo',
        messages: [{
            value: JSON.stringify(todoMessage)
        }],
    })

    ctx.body = {
        success: true
    }
})

module.exports = router

Add Kafka

webapp/messaging/kafka.js

const {
    Kafka
} = require('kafkajs')
const queueName = 'createTodoQueue'
const kafkaBrokerUrl = 'localhost:9092'

const kafka = new Kafka({
        clientId: queueName,
        brokers: [kafkaBrokerUrl],
    }),

    const producer = kafka.producer()
const kafkaServer = {
    run: async () => {},
}

module.exports = {
    kafkaServer,
    producer
}

Service App

Create an entry point.

service/server.js

const Koa = require('koa')
const router = require('./routes/router')
const {
    kafkaServer
} = require('./messaging/kafka')

const app = new Koa()

app.use(router())
kafkaServer.run().catch(console.error)

app.listen(3001)

Add the routes.

service/routes/router.js

const KoaRouter = require('koa-router')
const TodoRouter = require('./todos')
const combineRouters = require('koa-combine-routers')

const router = combineRouters(TodoRouter)

module.exports = router

service/routes/todos.js

var Router = require('koa-router')
const todoStore = require('../todoStore')

const router = Router({
    prefix: '/todos'
})

router.get('/', async (ctx) => {
    ctx.body = todoStore.getTodos()
})

module.exports = router

Add Kafka

service/messaging/kafka.js

const {
    Kafka
} = require('kafkajs')
const todoStore = require('../todoStore')

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:9092'],
})

const consumer = kafka.consumer({
    groupId: 'group_id'
})
const kafkaServer = {
    run: async () => {
        // Consuming
        await consumer.connect()
        await consumer.subscribe({
            topic: 'todo',
            fromBeginning: true
        })

        await consumer.run({
            eachMessage: async ({
                topic,
                partition,
                message
            }) => {
                console.log({
                    partition,
                    offset: message.offset,
                    value: message.value.toString(),
                })
                try {
                    const payload = JSON.parse(message.value.toString())
                    const newTodo = {
                        description: payload.description,
                        completed: false,
                    }
                    todoStore.addTodo(newTodo)
                } catch (error) {
                    console.error('Unable to process messasge', message.value.toString())
                }
            },
        })
    },
}

module.exports = {
    kafkaServer
}

Add a Todo store

service/todoStore.js

let todos = [{
        description: 'Get Milk',
        completed: false
    },
    {
        description: 'Get Bread',
        completed: false
    },
]
const todoStore = {
    getTodos: () => {
        return todos
    },
    addTodo: (todo) => {
        console.log('createTodos', todo)
        todos.push(todo)
    },
}

module.exports = todoStore

Docker Kafka

Use docker to run Kafka with docker-compose.

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
  kafka:
    image: wurstmeister/kafka
    ports:
      - '9092:9092'
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Test it

We should be able to run and the Web App and Service App.

docker-compose up
yarn dev:webapp
yarn dev:service
curl --request POST \
  --url http://localhost:3000/todos \
  --header 'content-type: application/json' \
  --data '{"description":"remember this"}'

Add Zipkin

We are finally ready to start to add tracing. We need these dependencies.

yarn add zipkin-context-cls zipkin-transport-http zipkin-instrumentation-fetch zipkin-instrumentation-kafkajs zipkin-instrumentation-koa

Create Tracers.

Tracers send traces events to Zipkin Note that in the Web App we wrap fetch as zipkinServiceFetch

/webapp/tracer.js

const {
    Tracer,
    BatchRecorder,
    jsonEncoder: {
        JSON_V2
    },
} = require('zipkin')
const CLSContext = require('zipkin-context-cls')
const {
    HttpLogger
} = require('zipkin-transport-http')
const fetch = require('node-fetch')
const wrapFetch = require('zipkin-instrumentation-fetch')
const serviceName = 'my-webapp'

const ctxImpl = new CLSContext('zipkin')
const recorder = new BatchRecorder({
    logger: new HttpLogger({
        endpoint: 'http://localhost:9411/api/v2/spans',
        jsonEncoder: JSON_V2,
    }),
})

const tracer = new Tracer({
    recorder,
    ctxImpl,
    localServiceName: serviceName,
})

const remoteServiceName = 'my-service'
const zipkinServiceFetch = wrapFetch(fetch, {
    tracer,
    remoteServiceName
})

module.exports = {
    tracer,
    serviceName,
    zipkinServiceFetch
}

/service/tracer.js

const {
    Tracer,
    BatchRecorder,
    jsonEncoder: {
        JSON_V2
    },
} = require('zipkin')
const CLSContext = require('zipkin-context-cls')
const {
    HttpLogger
} = require('zipkin-transport-http')
const serviceName = 'my-service'

const ctxImpl = new CLSContext('zipkin')
const recorder = new BatchRecorder({
    logger: new HttpLogger({
        endpoint: 'http://localhost:9411/api/v2/spans',
        jsonEncoder: JSON_V2,
    }),
})

const tracer = new Tracer({
    recorder,
    ctxImpl,
    localServiceName: serviceName,
})

module.exports = {
    tracer,
    serviceName
}

Connect the tracers with Koa middleware.

/webapp/server.js

const Koa = require('koa')
const router = require('./routes/router')
const bodyParser = require('koa-body')
const {
    koaMiddleware
} = require('zipkin-instrumentation-koa')
const {
    kafkaServer
} = require('./messaging/kafka')
const {
    tracer,
    serviceName
} = require('./tracer')

const app = new Koa()

app.use(bodyParser())
app.use(koaMiddleware({
    tracer,
    serviceName
}))
app.use(router())
kafkaServer.run().catch(console.error)
app.listen(3000)

/service/server.js

const Koa = require('koa')
const router = require('./routes/router')
const {
    koaMiddleware
} = require('zipkin-instrumentation-koa')
const {
    kafkaServer
} = require('./messaging/kafka')
const {
    tracer,
    serviceName
} = require('./tracer')

const app = new Koa()

app.use(koaMiddleware({
    tracer,
    serviceName
}))
app.use(router())
kafkaServer.run().catch(console.error)

app.listen(3001)

Update the Web App routes and Kafka.

In the Todo route, we replace the fetch implementation with an instrumented one.

webapp/routes/todo.js

var Router = require('koa-router')
const {
    zipkinServiceFetch: fetch
} = require('../tracer')
const url = 'http://localhost:3001/todos'
const {
    v4: uuidv4
} = require('uuid')
const {
    producer
} = require('../messaging/kafka')

const router = Router({
    prefix: '/todos'
})

router.get('/', async (ctx) => {
    try {
        const response = await fetch(url)
        const json = await response.json()
        ctx.body = json
    } catch (error) {
        console.log(error)
        ctx.body = 'failure'
    }
})

router.post('/', async (ctx) => {
    const todoMessage = {
        description: ctx.request.body.description,
        transactionId: uuidv4(),
    }
    console.log('hello', todoMessage)
    await producer.send({
        topic: 'todo',
        messages: [{
            value: JSON.stringify(todoMessage)
        }],
    })

    ctx.body = {
        success: true
    }
})

module.exports = router

For Kafka, we also need to replace it with an instrumented one.

webapp/messaging/kafka.js

const {
    Kafka
} = require('kafkajs')
const instrumentKafkaJs = require('zipkin-instrumentation-kafkajs')
const {
    tracer
} = require('../tracer')

const kafka = instrumentKafkaJs(
    new Kafka({
        clientId: 'my-app',
        brokers: ['localhost:9092'],
    }), {
        tracer, // Your zipkin tracer instance
        remoteServiceName: 'my-app', // This should be the symbolic name of the broker, not a consumer.
    }
)

const producer = kafka.producer()
const kafkaServer = {
    run: async () => {},
}

module.exports = {
    kafkaServer,
    producer
}

Update the Service App Kafka. For Kafka, we also need to replace it with an instrumented one.

service/messaging/kafka.js

const {
    Kafka
} = require('kafkajs')
const todoStore = require('../todoStore')
const instrumentKafkaJs = require('zipkin-instrumentation-kafkajs')
const {
    tracer
} = require('../tracer')

const kafka = instrumentKafkaJs(
    new Kafka({
        clientId: 'my-app',
        brokers: ['localhost:9092'],
    }), {
        tracer, // Your zipkin tracer instance
        remoteServiceName: 'serviceName', // This should be the symbolic name of the broker, not a consumer.
    }
)

const consumer = kafka.consumer({
    groupId: 'group_id'
})
const kafkaServer = {
    run: async () => {
        // Consuming
        await consumer.connect()
        await consumer.subscribe({
            topic: 'todo',
            fromBeginning: true
        })

        await consumer.run({
            eachMessage: async ({
                topic,
                partition,
                message
            }) => {
                console.log({
                    partition,
                    offset: message.offset,
                    value: message.value.toString(),
                })
                try {
                    const payload = JSON.parse(message.value.toString())
                    const newTodo = {
                        description: payload.description,
                        completed: false,
                    }
                    todoStore.addTodo(newTodo)
                } catch (error) {
                    console.error('Unable to process messasge', message.value.toString())
                }
            },
        })
    },
}

module.exports = {
    kafkaServer
}

Add Zipkin server

Update docker-compose to include Zipkin

docker-compose.ml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
  kafka:
    image: wurstmeister/kafka
    ports:
      - '9092:9092'
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  zipkin:
    image: openzipkin/zipkin
    container_name: zipkin
    ports:
      - '9411:9411'

Test it

We should be able to run and the Web App and Service App.

docker-compose up
yarn dev:webapp
yarn dev:service
curl --request POST \
  --url http://localhost:3000/todos \
  --header 'content-type: application/json' \
  --data '{"description":"remember this"}'
© Copyright 2023 Digital Garden cultivated by James Kolean.