最近用C#客户端调用Curator RPC Proxy服务,远程与zookeeper通信。客户端与RPC Proxy之间通信用Thrift,Thrift文件是从官网下载且能正常编译使用。
实现以下两个功能:
(1)监听zookeeper中特定node事件,包括增、删、改等信息;
(2)从客户端按下按钮,增加一个新node。
现在第一个功能已正常实现,第二个在铵下后出现如下错误,且界面死机。
自己添加的所有代码如下:
public partial class Form1 : Form
{
//CuratorService
CuratorService.Client curatorService = null;
EventService.Client eventService = null;
CuratorProjection curatorProjection = null;
CuratorProjection projection2 = null;
//TTransport type
TTransport transport = null;
TProtocol protocol = null;
//Thread operation function
public static void eventCycle( object obj, object obj2, object obj3){
CuratorProjection projection = (CuratorProjection)obj;
EventService.Client eventService = (EventService.Client)obj2;
CuratorService.Client curatorService = (CuratorService.Client)obj3;
PathChildrenCacheProjection cacheProjection = curatorService.startPathChildrenCache(
projection,
"/root",
true,
false,
PathChildrenCacheStartMode.BUILD_INITIAL_CACHE
);
while(Thread.CurrentThread.IsAlive){
try
{
CuratorEvent remoteEvent = eventService.getNextEvent(projection);
if (CuratorEventType.PING == remoteEvent.Type)
{
Console.WriteLine("Heartbeat");
}
else if (CuratorEventType.PATH_CHILDREN_CACHE == remoteEvent.Type)
{
if (PathChildrenCacheEventType.CHILD_ADDED == remoteEvent.ChildrenCacheEvent.Type)
{
ChildData child = remoteEvent.ChildrenCacheEvent.Data;
String path = child.Path;
Console.WriteLine("Children Added " + path);
}
else if (PathChildrenCacheEventType.CHILD_REMOVED == remoteEvent.ChildrenCacheEvent.Type)
{
ChildData child = remoteEvent.ChildrenCacheEvent.Data;
String path = child.Path;
Console.WriteLine("Child Deleted: " + path);
}
}
}catch( TTransportException transError)
{
Console.WriteLine( transError.Data );
}catch( TException tError )
{
if( tError is TApplicationException )
Console.WriteLine(tError.Data);
}
}
}
public Form1()
{
InitializeComponent();
transport = new TSocket( "127.0.0.1", 1234 );
transport.Open();
protocol = new TBinaryProtocol( transport );
curatorService = new CuratorService.Client( protocol );
eventService = new EventService.Client( protocol );
curatorProjection = curatorService.newCuratorProjection("main");
projection2 = curatorService.newCuratorProjection("main");
Thread eventListener = new Thread(delegate() { eventCycle(curatorProjection, eventService, curatorService); });
eventListener.Start();
}
private void button1_Click(object sender, EventArgs e)
{
try
{
CreateSpec node = new CreateSpec();
node.Path = "/root/xm";
node.Data = Encoding.Default.GetBytes("XM");
node.Mode = CreateMode.PERSISTENT_SEQUENTIAL;
node.CreatingParentsIfNeeded = true;
OptionalPath path = curatorService.createNode( projection2, node );
if(null != path)
Console.WriteLine( path.ToString() );
}catch( TTransportException error){
Console.WriteLine( error.Data );
}catch( TApplicationException tError){
Console.WriteLine( tError.Data );
}
}
}