自定义RxBus,RxManager with RxJava2

TT_123456789 2020-08-14 18:33:44 2283

现在查到的关于RxBus, RxManager的封装大部分是基于RxJava1的,从RxJava1到RxJava2的变化很大,很难平滑地过度,所以自己根据RxJava2重新封装下RxBus和RxManager

RxBus.java
public class RxBus {

private static RxBus instance;

/**
 * ConcurrentHashMap: 线程安全集合
 *   Subject 同时充当了Observer和Observable的角色
 */
@SuppressWarnings("rawtypes")
private ConcurrentHashMap<Object, List<Subject>> subjectMapper = new ConcurrentHashMap<>();

public static synchronized RxBus getInstance() {
    if(null == instance) {
        instance = new RxBus();
    }
    return instance;
}

private RxBus() {
}

/**
 * 订阅事件源
 *
 * @param observable
 * @param consumer
 * @return
 */
public RxBus onEvent(Observable<?> observable, Consumer<Object> consumer) {
    observable.observeOn(AndroidSchedulers.mainThread())
            .subscribe(consumer, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    throwable.printStackTrace();
                }
            });
    return getInstance();
}

/**
 * 注册事件源
 *
 * @param tag key
 * @param <T>
 * @return
 */
@SuppressWarnings({"rawtypes"})
public <T> Observable<T> register(@NonNull Object tag) {
    List<Subject> subjectList = subjectMapper.get(tag);
    if(null == subjectList) {
        subjectList = new ArrayList<>();
        subjectMapper.put(tag, subjectList);
    }

    Subject<T> subject = PublishSubject.create();
    subjectList.add(subject);
    // LogUtil.log("register" + tag + " size:" + subjectList.size());
    return subject;
}

/**
 * 取消整个tag的监听
 *
 * @param tag key
 */
@SuppressWarnings("rawtypes")
public void unregister(@NonNull Object tag) {
    List<Subject> subjectList = subjectMapper.get(tag);
    if(null != subjectList) {
        subjectMapper.remove(tag);
    }
}

/**
 * 取消tag里某个observable的监听
 *
 * @param tag key
 * @param observable 要删除的observable
 * @return
 */
@SuppressWarnings("rawtypes")
public RxBus unregister(@NonNull Object tag,
                        @NonNull Observable<?> observable) {
    if(null == observable) {
        return getInstance();
    }

    List<Subject> subjectList = subjectMapper.get(tag);
    if(null != subjectList) {
        // 从subjectList中删去observable
        subjectList.remove((Subject<?>) observable);
        // 若此时subjectList为空则从subjectMapper中删去
        if(isEmpty(subjectList)) {
            subjectMapper.remove(tag);
        }
    }
    return getInstance();
}

/**
 * 触发事件
 *
 * @param content
 */
public void post(@NonNull Object content) {
    post(content.getClass().getName(), content);
}

/**
 * 触发事件
 *
 * @param tag key
 * @param content
 */
@SuppressWarnings({"unchecked", "rawtypes"})
public void post(@NonNull Object tag, @NonNull Object content) {
    List<Subject> subjectList = subjectMapper.get(tag);
    if(!isEmpty(subjectList)) {
        for(Subject subject : subjectList) {
            subject.onNext(content);
        }
    }
}

/**
 * 判断集合是否为空
 *
 * @param collection 集合
 * @return
 */
@SuppressWarnings("rawtypes")
public static boolean isEmpty(Collection<Subject> collection) {
    return null == collection || collection.isEmpty();
}

}

RxManager.java
public class RxManager {

public RxBus mRxBus = RxBus.getInstance();

/**

  • 管理观察源
    */
    private Map<String, Observable<?>> mObservableMap = new HashMap<>();

/**

  • 管理订阅者
    */
    private CompositeDisposable mCompositeSubscription = new CompositeDisposable();

public void on(String eventName, Consumer consumer) {
// 注册
Observable<?> mObservable = mRxBus.register(eventName);

mObservableMap.put(eventName, mObservable);

mCompositeSubscription
        .add(mObservable.observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        throwable.printStackTrace();
                    }
                }));

}

/**

  • 添加订阅者到mCompositeSubscription
  • @param m 要添加的订阅者
    */
    public void add(Disposable m) {
    mCompositeSubscription.add(m);
    }

/**

  • 取消所有注册
    */
    public void clear() {
    // 取消订阅
    mCompositeSubscription.dispose();
    for(Map.Entry<String, Observable<?>> entry : mObservableMap.entrySet()) {
    // 取消注册
    mRxBus.unregister(entry.getKey(), entry.getValue());
    }
    }

/**

  • 触发事件
  • @param tag
  • @param content
    */
    public void post(Object tag, Object content) {
    mRxBus.post(tag, content);
    }

原文链接:点击这里

声明:本文内容由易百纳平台入驻作者撰写,文章观点仅代表作者本人,不代表易百纳立场。如有内容侵权或者其他问题,请联系本站进行删除。
红包 点赞 收藏 评论 打赏
评论
0个
内容存在敏感词
手气红包
    易百纳技术社区暂无数据
相关专栏
置顶时间设置
结束时间
删除原因
  • 广告/SPAM
  • 恶意灌水
  • 违规内容
  • 文不对题
  • 重复发帖
打赏作者
易百纳技术社区
TT_123456789
您的支持将鼓励我继续创作!
打赏金额:
¥1易百纳技术社区
¥5易百纳技术社区
¥10易百纳技术社区
¥50易百纳技术社区
¥100易百纳技术社区
支付方式:
微信支付
支付宝支付
易百纳技术社区微信支付
易百纳技术社区
打赏成功!

感谢您的打赏,如若您也想被打赏,可前往 发表专栏 哦~

举报反馈

举报类型

  • 内容涉黄/赌/毒
  • 内容侵权/抄袭
  • 政治相关
  • 涉嫌广告
  • 侮辱谩骂
  • 其他

详细说明

审核成功

发布时间设置
发布时间:
是否关联周任务-专栏模块

审核失败

失败原因
备注
拼手气红包 红包规则
祝福语
恭喜发财,大吉大利!
红包金额
红包最小金额不能低于5元
红包数量
红包数量范围10~50个
余额支付
当前余额:
可前往问答、专栏板块获取收益 去获取
取 消 确 定

小包子的红包

恭喜发财,大吉大利

已领取20/40,共1.6元 红包规则

    易百纳技术社区