失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > etcd学习和实战:4 Java使用etcd实现服务发现和管理

etcd学习和实战:4 Java使用etcd实现服务发现和管理

时间:2019-07-25 22:12:17

相关推荐

etcd学习和实战:4 Java使用etcd实现服务发现和管理

etcd学习和实战:4、Java使用etcd实现服务发现和管理

文章目录

etcd学习和实战:4、Java使用etcd实现服务发现和管理1. 前言2. 代码2.1 服务注册2.2 服务发现2.3 运行结果2.4 问题3. 最后

1. 前言

Java一般使用zookeeper来实现分布式系统下服务管理,zookeeper也具备key-value的存取功能,这里我们不讨论zookeeper和etcd的优劣,只提一下对于Java实现类似功能可能也有zookeeper这样的方案。

同样分为服务注册和发现两大部分,思路和go实现时相同,所以直接上代码并进行测试即可。

2. 代码

使用了jetcd,目前还是beta版本,但是目前似乎只有这个支持etcd v3版本。

参考自:

/etcd-io/jetcd/tree/master/jetcd-examples

https://xinchen./article/details/115434576

/p/bd7eed1f250c

2.1 服务注册

设置endpoints(端点)并创建etcd客户端设置租约注册服务并绑定持续监听租约

Register.java:

import io.etcd.jetcd.ByteSequence;import io.etcd.jetcd.Client;import io.etcd.jetcd.KV;import io.etcd.jetcd.Lease;import io.etcd.jetcd.Response;import io.etcd.jetcd.lease.LeaseKeepAliveResponse;import io.etcd.jetcd.options.PutOption;import io.grpc.stub.CallStreamObserver;import static mon.base.Charsets.UTF_8;public class Register {private Client client;private String endpoints;private Object lock = new Object();public Register(String endpoints) {super();this.endpoints = endpoints;}/*** 新建key-value客户端实例* @return*/private KV getKVClient(){if (null==client) {synchronized (lock) {if (null==client) {client = Client.builder().endpoints(endpoints.split(",")).build();}}}return client.getKVClient();}public void close() {client.close();client = null;}public Response.Header put(String key, String value) throws Exception {return getKVClient().put(bytesOf(key), bytesOf(value)).get().getHeader();}/*** 将字符串转为客户端所需的ByteSequence实例* @param val* @return*/public static ByteSequence bytesOf(String val) {return ByteSequence.from(val, UTF_8);}private Client getClient() {if (null==client) {synchronized (lock) {if (null==client) {client = Client.builder().endpoints(endpoints.split(",")).build();}}}return client;}public void putWithLease(String key, String value) throws Exception {Lease leaseClient = getClient().getLeaseClient();leaseClient.grant(60).thenAccept(result -> {// 租约IDlong leaseId = result.getID();// 准备好put操作的clientKV kvClient = getClient().getKVClient();// put操作时的可选项,在这里指定租约IDPutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();// put操作kvClient.put(bytesOf(key), bytesOf(value), putOption).thenAccept(putResponse -> {// put操作完成后,再设置无限续租的操作leaseClient.keepAlive(leaseId, new CallStreamObserver<LeaseKeepAliveResponse>() {@Overridepublic boolean isReady() {return false;}@Overridepublic void setOnReadyHandler(Runnable onReadyHandler) {}@Overridepublic void disableAutoInboundFlowControl() {}@Overridepublic void request(int count) {}@Overridepublic void setMessageCompression(boolean enable) {}/*** 每次续租操作完成后,该方法都会被调用* @param value*/@Overridepublic void onNext(LeaseKeepAliveResponse value) {System.out.println("续租完成");}@Overridepublic void onError(Throwable t) {System.out.println(t);}@Overridepublic void onCompleted() {}});});});}}

RegisterTest.java:

public class RegisterTest {public static void main(String[] args) {Register register = new Register("http://localhost:2379");String key = "/web/node0";String value = "localhost:7999";// try {// register.put(key, value);// } catch (Exception e) {// e.printStackTrace();// }try {register.putWithLease(key, value);} catch (Exception e) {e.printStackTrace();}}}

2.2 服务发现

设置endpoints创建etcd客户端初始化配置并监听服务前缀(实际上也可以直接监听key,但不够灵活,监听前缀值更好一些)根据监听到的对key的操作类型进行进一步处理

Discovery.java:

import java.nio.charset.StandardCharsets;import java.util.HashMap;import java.util.List;import java.util.Map;import java.pletableFuture;import java.util.concurrent.ExecutionException;import io.etcd.jetcd.ByteSequence;import io.etcd.jetcd.Client;import io.etcd.jetcd.KV;import io.etcd.jetcd.KeyValue;import io.etcd.jetcd.Response;import io.etcd.jetcd.Watch;import io.etcd.jetcd.Watch.Watcher;import io.etcd.jetcd.kv.GetResponse;import io.etcd.jetcd.options.GetOption;import io.etcd.jetcd.options.WatchOption;import io.etcd.jetcd.watch.WatchEvent;import static java.nio.charset.StandardCharsets.UTF_8;public class Discovery {private Client client;private String endpoints;private final Object lock = new Object();private HashMap<String, String> serverList = new HashMap<String, String>();/*** 发现服务类信息初始化* @param endpoints:监听端点,包含ip和端口,如:"http://localhost:2379“,多个端点则使用逗号分割, 比如:”http://localhost:2379,http://192.168.2.1:2330“*/public Discovery(String endpoints) {this.endpoints = endpoints;newServiceDiscovery();}public Client newServiceDiscovery() {if (null == client) {synchronized (lock) {if (null == client) {client = Client.builder().endpoints(endpoints.split(",")).build();}}}return client;}public void watchService(String prefixAddress) {//请求当前前缀CompletableFuture<GetResponse> getResponseCompletableFuture =client.getKVClient().get(ByteSequence.from(prefixAddress,UTF_8),GetOption.newBuilder().withPrefix(ByteSequence.from(prefixAddress, UTF_8)).build());try {//获取当前前缀下的服务并存储List<KeyValue> kvs = getResponseCompletableFuture.get().getKvs();for (KeyValue kv : kvs) {setServerList(kv.getKey().toString(UTF_8), kv.getValue().toString(UTF_8));}//创建线程监听前缀new Thread(new Runnable() {@Overridepublic void run() {watcher(prefixAddress);}}).start();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}private void watcher(String prefixAddress) {Watcher watcher;System.out.println("watching prefix:" + prefixAddress);WatchOption watchOpts = WatchOption.newBuilder().withPrefix(ByteSequence.from(prefixAddress,UTF_8)).build();//实例化一个监听对象,当监听的key发生变化时会被调用Watch.Listener listener = Watch.listener(watchResponse -> {watchResponse.getEvents().forEach(watchEvent -> {WatchEvent.EventType eventType = watchEvent.getEventType();KeyValue keyValue = watchEvent.getKeyValue();System.out.println("type="+eventType+",key="+keyValue.getKey().toString(UTF_8)+",value="+keyValue.getValue().toString(UTF_8));switch (eventType) {case PUT: //修改或者新增setServerList(keyValue.getKey().toString(UTF_8), keyValue.getValue().toString(UTF_8));break;case DELETE: //删除delServerList(keyValue.getKey().toString(UTF_8), keyValue.getValue().toString(UTF_8));break;}});});client.getWatchClient().watch(ByteSequence.from(prefixAddress, UTF_8), watchOpts,listener);}private void setServerList(String key, String value) {synchronized (lock) {serverList.put(key, value);System.out.println("put key:" + key + ",value:" + value);}}private void delServerList(String key, String value) {synchronized (lock) {serverList.remove(key);System.out.println("del key:" + key);}}public void close() {client.close();client = null;}}

DiscoveryTest.java:

public class DiscoveryTest {public static void main(String[] args) {String endpoints = "http://localhost:2379";Discovery ser = new Discovery(endpoints);ser.watchService("/web/");ser.watchService("/grpc/");while (true) {}}}

2.3 运行结果

//先运行服务发现$ java -jar discovery.jar put key:/web/node0,value:localhost:7999watching prefix:/web/watching prefix:/grpc/type=PUT,key=/web/node0,value=localhost:7999put key:/web/node0,value:localhost:7999...//再运行服务注册$ java -jar register.jar 续租完成续租完成续租完成续租完成...

2.4 问题

六月 07, 4:53:24 下午 io.grpc.internal.ManagedChannelImpl$NameResolverListener handleErrorInSyncContext警告: [Channel<1>: (etcd)] Failed to resolve name. status=Status{code=NOT_FOUND, description=null, cause=null}...

这个是endpoints错误导致的,需要在前面添加http://,即"http://localhost:2379"而不是"localhost:2379"。

3. 最后

实际生产环境中目前etcd+grpc更适合Go,java目前仅有jetcd支持gRPC,更多的还是v2版本,使用的gRPC的版本也比较底,还处于beat版本。

如果觉得《etcd学习和实战:4 Java使用etcd实现服务发现和管理》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。