之前写了一篇用RxJava实现事件总线RxBus并实现同类型事件的区分用RxJava实现了事件总线RxBus,并实现了通过code进行相同事件类型的区分。但是发现使用起来还是不怎么方便,没有eventbus那样用起来方便。
我们来看一下之前是怎么用的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Subscription subscription =RxBus.getDefault() .toObservable(100,String.class) .subscribe(new Action1<String>() { @Override public void call(String s) { Log.e("CM", s); } });
RxBus.getDefault().post(100, "123456");
@Override protected void onDestroy() { super.onDestroy(); if(!subscription .isUnsubscribed()) { subscription .unsubscribe(); } }
|
确实没有EventBus用起来方便。于是对之前的RxBus又进行了改造,参考了EventBus的使用方法。
添加了注解类@Subscribe
对方法进行注解:
1 2 3 4 5 6 7
| @Documented @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface Subscribe { int code() default -1; ThreadMode threadMode() default ThreadMode.CURRENT_THREAD; }
|
code进行相同事件的区分,threadMode用来设置事件处理所在的线程.
ThreadMode代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public enum ThreadMode {
CURRENT_THREAD,
MAIN,
NEW_THREAD
}
|
另外添加了一个SubscriberMethod
类,用于保存@Subscribe
注解的方法的相关信息(参考EventBus的SubscriberMethod).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class SubscriberMethod { public Method method; public ThreadMode threadMode; public Class<?> eventType; public Object subscriber; public int code;
public SubscriberMethod(Object subscriber, Method method, Class<?> eventType, int code,ThreadMode threadMode) { this.method = method; this.threadMode = threadMode; this.eventType = eventType; this.subscriber = subscriber; this.code = code; }
public void invoke(Object o){ try { method.invoke(subscriber, o); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } }
}
|
接下来看看重点RxBus的代码:

| package com.cm.rxbus;
import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map;
import rx.Observable; import rx.Subscription; import rx.android.schedulers.AndroidSchedulers; import rx.functions.Action1; import rx.functions.Func1; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; import rx.subjects.SerializedSubject; import rx.subjects.Subject;
public class RxBus { private static volatile RxBus defaultInstance;
private Map<Class,List<Subscription>> subscriptionsByEventType = new HashMap<>();
private Map<Object,List<Class>> eventTypesBySubscriber = new HashMap<>();
private Map<Class,List<SubscriberMethod>> subscriberMethodByEventType = new HashMap<>();
private final Subject bus; public RxBus() { bus = new SerializedSubject<>(PublishSubject.create()); } public static RxBus getDefault() { RxBus rxBus = defaultInstance; if (defaultInstance == null) { synchronized (RxBus.class) { rxBus = defaultInstance; if (defaultInstance == null) { rxBus = new RxBus(); defaultInstance = rxBus; } } } return rxBus; }
public void post (Object o) { bus.onNext(o); }
public <T> Observable<T> toObservable(Class<T> eventType) { return bus.ofType(eventType); }
public void post(int code, Object o){ bus.onNext(new Message(code,o));
}
public <T> Observable<T> toObservable(final int code, final Class<T> eventType) { return bus.ofType(Message.class) .filter(new Func1<Message,Boolean>() { @Override public Boolean call(Message o) { return o.getCode() == code && eventType.isInstance(o.getObject()); } }).map(new Func1<Message,Object>() { @Override public Object call(Message o) { return o.getObject(); } }).cast(eventType); }
public void register(Object subscriber){ Class<?> subClass = subscriber.getClass(); Method[] methods = subClass.getDeclaredMethods(); for(Method method : methods){ if(method.isAnnotationPresent(Subscribe.class)){ Class[] parameterType = method.getParameterTypes(); if(parameterType != null && parameterType.length == 1){
Class eventType = parameterType[0];
addEventTypeToMap(subscriber,eventType); Subscribe sub = method.getAnnotation(Subscribe.class); int code = sub.code(); ThreadMode threadMode = sub.threadMode();
SubscriberMethod subscriberMethod = new SubscriberMethod(subscriber,method,eventType, code,threadMode); addSubscriberToMap(eventType, subscriberMethod);
addSubscriber(subscriberMethod); } } } }
private void addEventTypeToMap(Object subscriber, Class eventType){ List<Class> eventTypes = eventTypesBySubscriber.get(subscriber); if(eventTypes == null){ eventTypes = new ArrayList<>(); eventTypesBySubscriber.put(subscriber,eventTypes); }
if(!eventTypes.contains(eventType)){ eventTypes.add(eventType); } }
private void addSubscriberToMap(Class eventType, SubscriberMethod subscriberMethod){ List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType); if(subscriberMethods == null){ subscriberMethods = new ArrayList<>(); subscriberMethodByEventType.put(eventType,subscriberMethods); }
if(!subscriberMethods.contains(subscriberMethod)){ subscriberMethods.add(subscriberMethod); } }
private void addSubscriptionToMap(Class eventType, Subscription subscription){ List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if(subscriptions == null){ subscriptions = new ArrayList<>(); subscriptionsByEventType.put(eventType,subscriptions); }
if(!subscriptions.contains(subscription)){ subscriptions.add(subscription); } }
public void addSubscriber(final SubscriberMethod subscriberMethod){ Observable observable ; if(subscriberMethod.code == -1){ observable = toObservable(subscriberMethod.eventType); }else{ observable = toObservable(subscriberMethod.code,subscriberMethod.eventType); }
Subscription subscription = postToObservable(observable,subscriberMethod) .subscribe(new Action1<Object>() { @Override public void call(Object o) { callEvent(subscriberMethod.code,o); } }); addSubscriptionToMap(subscriberMethod.eventType ,subscription); }
private Observable postToObservable(Observable observable, SubscriberMethod subscriberMethod) {
switch (subscriberMethod.threadMode) { case MAIN: observable.observeOn(AndroidSchedulers.mainThread()); break;
case NEW_THREAD: observable.observeOn(Schedulers.newThread()); break; case CURRENT_THREAD: observable.observeOn(Schedulers.immediate()); break; default: throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode); } return observable; }
private void callEvent(int code, Object object){ Class eventClass = object.getClass(); List<SubscriberMethod> methods = subscriberMethodByEventType.get(eventClass); if(methods != null && methods.size() > 0){ for(SubscriberMethod subscriberMethod : methods){
Subscribe sub = subscriberMethod.method.getAnnotation(Subscribe.class); int c = sub.code(); if(c == code){ subscriberMethod.invoke(object); }
} } }
public void unRegister(Object subscriber){ List<Class> subscribedTypes = eventTypesBySubscriber.get(subscriber); if (subscribedTypes != null) { for (Class<?> eventType : subscribedTypes) { unSubscribeByEventType(eventType); unSubscribeMethodByEventType(subscriber,eventType); } eventTypesBySubscriber.remove(subscriber); } }
private void unSubscribeByEventType(Class eventType){ List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { Iterator<Subscription> iterator = subscriptions.iterator(); while(iterator.hasNext()){ Subscription subscription = iterator.next(); if(subscription !=null && !subscription.isUnsubscribed()){ subscription.unsubscribe(); iterator.remove(); } } } }
private void unSubscribeMethodByEventType(Object subscriber, Class eventType){ List<SubscriberMethod> subscriberMethods = subscriberMethodByEventType.get(eventType); if(subscriberMethods != null){ Iterator<SubscriberMethod> iterator = subscriberMethods.iterator(); while (iterator.hasNext()){ SubscriberMethod subscriberMethod = iterator.next(); if(subscriberMethod.subscriber.equals(subscriber)){ iterator.remove(); } } } }
}
|
register的时候获取@Subscribe
注解的方法的相关信息保存到map,post事件触发的时候调用@Subscribe
注解的方法并传入参数.
unRegister的移除保存的subscriber、subscriberMethod已经Subscription取消订阅事件。
再来看使用方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| package com.cm.simple;
import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import android.view.View;
import com.cm.rxbus.RxBus; import com.cm.rxbus.Subscribe; import com.cm.rxbus.ThreadMode;
import butterknife.ButterKnife; import butterknife.OnClick;
public class MainActivity extends AppCompatActivity {
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); ButterKnife.bind(this); RxBus.getDefault().register(this); }
@Subscribe public void onEvent(String s) { Log.e("CMAD", "------>" + s); }
@Subscribe public void onEvent(EventA eventA) { Log.e("CMAD", "---onEvent EventA-->" + eventA.text); }
@Subscribe(code = 102) public void onEventWithCode(EventA eventA) { Log.e("CMAD", "---onEvent EventA 102-->" + eventA.text); }
@Subscribe(code = 103, threadMode = ThreadMode.MAIN) public void onEventWithCodeAndThreadMode(EventA eventA) { Log.e("CMAD", "---onEvent EventB 103--->" + eventA.text); }
@Override protected void onDestroy() { super.onDestroy(); RxBus.getDefault().unRegister(this); }
@OnClick({R.id.btn1, R.id.btn2, R.id.btn3, R.id.btn4}) public void onClick(View view) { switch (view.getId()) { case R.id.btn1: RxBus.getDefault().post("123456"); break; case R.id.btn2: RxBus.getDefault().post(new EventA()); break; case R.id.btn3: RxBus.getDefault().post(102, new EventA("event code 102")); break; case R.id.btn4: RxBus.getDefault().post(103, new EventA("event code 103")); break; } }
class EventA { public String text = "BeanA";
public EventA() {
}
public EventA(String text) { this.text = text; }
@Override public String toString() { return "EventA{" + "text='" + text + ''' + '}'; } } }
|
使用方法是不是跟EventBus很像,同样使用@Subscribe
注解,threadMode设置在什么线程执行.并添加了code进行相同类型事件的区分.
整个工程可以移步GitHub查看:RxBus
项目已上传到jcenter
,在Gradle中可以直接使用compile 'com.cm:rxbus:1.0'