consumer代码如下
var http = require('http');
http.createServer(function (request, response) {
// 发送 HTTP 头部
// HTTP 状态值: 200 : OK
// 内容类型: text/plain
response.writeHead(200, {'Content-Type': 'text/plain'});
var kafka = require('kafka-node');
var Consumer = kafka.Consumer;
var Client = kafka.Client;
var client = new Client("localhost:2181");
var consumer = new Consumer(
client,
[
{ topic: 'fltest'}
],
{
autoCommit: false
}
);
consumer.on('message',function(message){
console.log(message);
});
consumer.on('error',function(err){
console.log(err);
});
// 发送响应数据 "Hello World"
response.end('Hello World\n');
}).listen(9999);
// 终端打印如下信息
console.log('Server running at http://127.0.0.1:9999/');
Consumer老是报如下错误
E:\node_modules\.1.0.7@kafka-node\lib\codec\snappy.js:9
throw new Error('Snappy codec is not installed');
^
Error: Snappy codec is not installed
at Object.unavailableCodec (E:\node_modules\.1.0.7@kafka-node\lib\codec\snap
py.js:9:11)
at Object.<anonymous> (E:\node_modules\.1.0.7@kafka-node\lib\protocol\protoc
ol.js:183:17)
at Object.self.tap (E:\node_modules\.0.3.0@binary\index.js:248:12)
at decodeMessageSet (E:\node_modules\.1.0.7@kafka-node\lib\protocol\protocol
.js:161:8)
at Object.<anonymous> (E:\node_modules\.1.0.7@kafka-node\lib\protocol\protoc
ol.js:128:26)
at Object.self.tap (E:\node_modules\.0.3.0@binary\index.js:248:12)
at Object.decodePartitions (E:\node_modules\.1.0.7@kafka-node\lib\protocol\p
rotocol.js:123:8)
at Object.self.loop (E:\node_modules\.0.3.0@binary\index.js:267:16)
at Object.<anonymous> (E:\node_modules\.1.0.7@kafka-node\lib\protocol\protoc
ol.js:57:8)
at Object.self.loop (E:\node_modules\.0.3.0@binary\index.js:267:16)
但在我的当前目录下有一个.5.0.5@snappy。我知道是怎么回事
在安装snapyy的时候也提示
E:\node_modules\.5.0.5@snappy>node "C:\Users\Administrator\AppData\Roaming\npm\n
ode_modules\cnpm\node_modules\npminstall\node-gyp-bin\\node-gyp.js" rebuild
在此解决方案中一次生成一个项目。若要启用并行生成,请添加“/m”开关。
CL : fatal error C1510: Cannot load language resource clui.dll. [E:\node_module
s\.5.0.5@snappy\build\deps\snappy\snappy.vcxproj]
gyp ERR! build error
gyp ERR! stack Error: `C:\Windows\Microsoft.NET\Framework\v4.0.30319\msbuild.exe
` failed with exit code: 1
gyp ERR! stack at ChildProcess.onExit (C:\Users\Administrator\AppData\Roamin
g\npm\node_modules\cnpm\node_modules\node-gyp\lib\build.js:276:23)
gyp ERR! stack at emitTwo (events.js:106:13)
gyp ERR! stack at ChildProcess.emit (events.js:191:7)
gyp ERR! stack at Process.ChildProcess._handle.onexit (internal/child_proces
s.js:215:12)
gyp ERR! System Windows_NT 6.1.7601
gyp ERR! command "D:\\Application\\Node.js\\node.exe" "C:\\Users\\Administrator\
\AppData\\Roaming\\npm\\node_modules\\cnpm\\node_modules\\npminstall\\node-gyp-b
in\\node-gyp.js" "rebuild"
gyp ERR! cwd E:\node_modules\.5.0.5@snappy
gyp ERR! node -v v6.9.3
gyp ERR! node-gyp -v v3.4.0
gyp ERR! not ok
Error: post install error, please remove node_modules before retry!
Run "C:\Windows\system32\cmd.exe /d /s /c node-gyp rebuild" error, exit code 1
at ChildProcess.proc.on.code (C:\Users\Administrator\AppData\Roaming\npm\nod
e_modules\cnpm\node_modules\runscript\index.js:67:21)
at emitTwo (events.js:106:13)
at ChildProcess.emit (events.js:191:7)
at maybeClose (internal/child_process.js:885:16)
at Process.ChildProcess._handle.onexit (internal/child_process.js:226:5)
npminstall version: 2.16.0
npminstall args: E:\node.exe C:\Users\Administrator\AppData\Roaming\npm\node_mod
ules\cnpm\node_modules\npminstall\bin\install.js --china --userconfig=C:\Users\A
dministrator\.cnpmrc --disturl=https://npm.taobao.org/mirrors/node --registry=ht
tps://registry.npm.taobao.org snappy
新写的代码如下
var http = require('http');
http.createServer(function (request, response) {
// 发送 HTTP 头部
// HTTP 状态值: 200 : OK
// 内容类型: text/plain
response.writeHead(200, {'Content-Type': 'text/plain'});
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
//HighLevelConsumer = kafka.HighLevelConsumer,
client = new kafka.Client(),
consumer = new Consumer(
client,
payloads = [
{ topic:'fltest',partition:0},
{ topic:'fltest',partition:1},
{ topic:'fltest',partition:2}
],
options = {
groupId: 'kafka-node-group',
autoCommit: false,
fetchMaxWaitMs: 100,
fetchMinBytes: 1,
fetchMaxBytes: 1024 * 1024,
fromOffset: false,
encoding: 'utf8'
}
);
//consumer = new HighLevelConsumer(
//client,
//[
//{topic:'fltest'}
//],
//{
//groupId: 'kafka-node-group',
//autoCommit: false,
//fetchMaxWaitMs: 100,
//fetchMinBytes: 1,
//fetchMaxBytes: 1024 * 1024,
//fromOffset: false,
//encoding: 'utf8'
//}
//);
consumer.on('message', function (message) {
console.log(message);
});
consumer.on('error', function (err) {
//console.log(err);
});
//consumer.on('offsetOutOfRange', function (err) {
//console.log(err);
//});
consumer.close(function(){});
// 发送响应数据 "Hello World"
response.end('Hello World\n');
}).listen(9999);
// 终端打印如下信息
console.log('Server running at http://127.0.0.1:9999/');
现在报错又变成了
E:\node_modules\.0.2.2@node-zookeeper-client\lib\ConnectionManager.js:624
if (!this.socket.write(packet.request.toBuffer())) {
^
TypeError: Cannot read property 'write' of undefined
at ConnectionManager.onPacketQueueReadable (E:\node_modules\.0.2.2@node-zook
eeper-client\lib\ConnectionManager.js:624:25)
at emitNone (events.js:86:13)
at PacketQueue.emit (events.js:185:7)
at PacketQueue.push (E:\node_modules\.0.2.2@node-zookeeper-client\lib\Packet
Queue.js:35:10)
at ConnectionManager.queue (E:\node_modules\.0.2.2@node-zookeeper-client\lib
\ConnectionManager.js:711:30)
at ConnectionManager.close (E:\node_modules\.0.2.2@node-zookeeper-client\lib
\ConnectionManager.js:248:10)
at Client.close (E:\node_modules\.0.2.2@node-zookeeper-client\index.js:229:2
8)
at Zookeeper.close (E:\node_modules\.1.0.7@kafka-node\lib\zookeeper.js:468:1
5)
at Client.close (E:\node_modules\.1.0.7@kafka-node\lib\client.js:163:11)
at Consumer.close (E:\node_modules\.1.0.7@kafka-node\lib\consumer.js:264:17)