本文共 2479 字,大约阅读时间需要 8 分钟。
前文作者讲述了BottledWater-PG安装部署,并在pg中实现了数据改变,向kafka发送消息的案例,详细参考。此前作者写过一篇pg的异步消息实现的实时地图应用案例,本文将改用BottledWater-PG实现一遍。
var fs = require('fs');var http = require('http');var socket = require('socket.io');var Kafka = require('node-rdkafka'); var server = http.createServer(function(req, res) { res.writeHead(200, { 'Content-type': 'text/html'}); res.end(fs.readFileSync(__dirname + '/index.html')); }).listen(8081, function() { console.log('Listening at: http://localhost:8081');});//注册socket.iovar socketio=socket.listen(server);socketio.on('connection', function (socketclient) { console.log('已连接socket:'); //socketclient.broadcast.emit('GPSCoor', data.payload);//广播给别人 //socketclient.emit('GPSCoor', data.payload);//广播给自己 });var consumer = new Kafka.KafkaConsumer({ //'debug': 'all', 'metadata.broker.list': '192.168.43.27:9092', 'group.id': 'node-rdkafka-consumer-flow-example', 'enable.auto.commit': false});var topicName = 'gps';//logging debug messages, if debug is enabled consumer.on('event.log', function(log) { console.log(log);});//打印错误consumer.on('error', function(err) { console.error('Error from consumer'); console.error(err);});consumer.on('ready', function(arg) { console.log('consumer ready.' + JSON.stringify(arg)); consumer.subscribe([topicName]); //准备消费消息 consumer.consume();});consumer.on('data', function(m) { console.log(m); let _data; if(m.value==null)//delete操作发送来的消息 { _data=JSON.parse(m.key); _data.tg_op='delete'; } else{ _data=m.value.toString(); _data=JSON.parse(_data); } console.log(_data); socketio.emit('GPSCoor', _data);//广播给所有的客户端});consumer.on('disconnected', function(arg) { console.log('consumer disconnected. ' + JSON.stringify(arg));});//启动consumer.connect();
实时地图应用
mcsas=# insert into gps(name,geom) values ('opy','Point(118 31.5)');INSERT 0 1mcsas=# insert into gps(name,geom) values ('ty','Point(117 30.5)');INSERT 0 1
mcsas=# update gps set geom='Point(115 40)' where name='opy';UPDATE 1
mcsas=# delete from gps where name='opy';DELETE 1
BottledWater-PG主要作用是将pg库中的表的增删改的消息都发往了kafka,应用程序并没有直接连接数据库,而是直接去消费kafka的消息。在表发生insert,update,delete能获取消息,但是truncate table并未向kafka生成消息,不知是否是我哪里遗漏。
作者之前曾使用pg自带的notify与listen实现异步消息发送,该方法借助了表的触发器实现。应用程序是直连数据库且数据增删改都会走触发器。 匆忙中,作者并未对比两者之间孰优孰劣,但一个直连库,一个间接消费,在不同需求中可选择一个比较符合要求的方案而加以应用。转载地址:http://sepia.baihongyu.com/