之前事件总线都是用EventBus或者otto来做,现在RxJava越来越火了,用RxJava实现事件总线也是很方便的.不多说直接看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class RxBus { private static volatile RxBus defaultInstance; private final Subject bus; public RxBus () { bus = new SerializedSubject <>(PublishSubject.create()); } public static RxBus getDefault () { RxBus rxBus = defaultInstance; if (defaultInstance == null ) { synchronized (RxBus.class) { rxBus = defaultInstance; if (defaultInstance == null ) { rxBus = new RxBus (); defaultInstance = rxBus; } } } return rxBus; } public void post (Object o) { bus.onNext(o); } public <T> Observable<T> toObserverable (Class<T> eventType) { return bus.ofType(eventType); } }
使用方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class) .subscribe(new Action1 <UserEvent>() { @Override public void call (UserEvent userEvent) { long id = userEvent.getId(); String name = userEvent.getName(); ... } }, new Action1 <Throwable>() { @Override public void call (Throwable throwable) { } });
上面RxBus实现代码引用自:
用RxJava实现事件总线(Event Bus)](http://www.jianshu.com/p/ca090f6e2fe2/ ) 更详细信息可以点击查看
发现上面RxJava实现的RxBus确实很方便,代码也挺简洁.但是有个问题是我在使用EventBus以及otto的时候一直存在的疑问,就是当发送相同类型的事件或者消息的时候接收的时候怎么去区分?
基于这个问题我对上面的RxBus进行了改造.代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 package com.cm.rxbus;import rx.Observable;import rx.functions.Func1;import rx.subjects.PublishSubject;import rx.subjects.SerializedSubject;import rx.subjects.Subject;public class RxBus { private static volatile RxBus defaultInstance; private final Subject bus; public RxBus () { bus = new SerializedSubject <>(PublishSubject.create()); } public static RxBus getDefault () { RxBus rxBus = defaultInstance; if (defaultInstance == null ) { synchronized (RxBus.class) { rxBus = defaultInstance; if (defaultInstance == null ) { rxBus = new RxBus (); defaultInstance = rxBus; } } } return rxBus; } public void post (Object o) { bus.onNext(o); } public <T> Observable<T> toObservable (Class<T> eventType) { return bus.ofType(eventType); } public void post (int code, Object o) { bus.onNext(new Message (code,o)); } public <T> Observable<T> toObservable (final int code, final Class<T> eventType) { return bus.ofType(Message.class) .filter(new Func1 <Message,Boolean>() { @Override public Boolean call (Message o) { return o.getCode() == code && eventType.isInstance(o.getObject()); } }).map(new Func1 <Message,Object>() { @Override public Object call (Message o) { return o.getObject(); } }).cast(eventType); } }
Message代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.cm.rxbus;public class Message { private int code; private Object object; public Message (<span />) {} public Message (int code, Object o) { this .code = code; this .object = o; } }
RxBus里添加了一个post(int code, Object o)
方法,里面调用了bus.onNext(new Message(code,o));
将code跟object用Message类进行了封装.
toObservable(final int code, final Class<T> eventType)
对传入code的事件进行分发,先调用bus.ofType(Message.class)
返回Message类的观察者,然后通过filter
操作符返回Message里code跟Object类型跟传入的类型都匹配的观察者,再通过map
操作符返回Message里的object对象,最后通过cast
转化为特定类的观察者.
使用方法:
1 2 3 4 5 6 7 8 9 10 11 12 Subscription subscription = RxBus.getDefault() .toObservable(100 ,String.class) .subscribe(new Action1 <String>() { @Override public void call (String s) { Log.e("CM" , s); } }); RxBus.getDefault().post(100 , "123456" );
这样如果有多个消息是相同类型的话就可以通过不同的code进行区分了.
最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。
1 2 3 4 5 6 7 @Override protected void onDestroy () { super .onDestroy(); if (!subscription .isUnsubscribed()) { subscription .unsubscribe(); } }