package cn.sinata.rxnetty; import android.app.*; import android.content.Context; import android.content.Intent; import android.graphics.Color; import android.os.Build; import android.os.IBinder; import android.util.Log; import androidx.annotation.Nullable; import androidx.core.app.NotificationCompat; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.concurrent.TimeUnit; import cn.sinata.rxnetty.netStatus.NetChangeObserver; import cn.sinata.rxnetty.netStatus.NetStateReceiver; import cn.sinata.rxnetty.netStatus.NetUtils; import cn.sinata.rxnetty.pipeline.LengthFieldConfigurator; import io.netty.buffer.ByteBuf; import io.reactivex.netty.RxNetty; import io.reactivex.netty.channel.ObservableConnection; import rx.Observable; import rx.Subscriber; import rx.functions.Action1; import rx.functions.Func1; import rx.schedulers.Schedulers; /** * 长链接基本service。修改为监听器(接口方式)实现。 */ public class CoreService extends Service { @Nullable @Override public IBinder onBind(Intent intent) { return null; } @Override public void onCreate() { super.onCreate(); } void init() { /* * 注册发送消息事件监听。 */ NettyClient.getInstance().setSendListener(new OnSendListener() { @Override public void onSend(String s) { //先检查通道状态,如果断开,则重连。 checkState(); Observable send = send(s + "\n"); // Observable send = send(s); if (send != null) { send.subscribe(new Subscriber() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(Void aVoid) { } }); } } }); initCheckOb(); } public void checkState() { if (mConnection == null || mConnection.getChannel() == null || !mConnection.getChannel().isActive() || !mConnection.getChannel().isWritable()) { reConnect(); } } void initCheckOb() { NettyClient.getInstance().setOnCheckListener(new OnCheckListener() { @Override public void doCheck() { checkState(); } }); } protected NetChangeObserver mNetChangeObserver = null; @Override public int onStartCommand(Intent intent, int flags, int startId) { init(); NetStateReceiver.registerNetworkStateReceiver(this); mNetChangeObserver = new NetChangeObserver() { @Override public void onNetConnected(NetUtils.NetType type) { super.onNetConnected(type); checkState(); } }; NetStateReceiver.registerObserver(mNetChangeObserver); if (Config.isStartForeground) { startForeground(Config.NOTIFICATION_ID,getNotification()); } return START_NOT_STICKY; } private Notification getNotification(){ if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.O) { //8.0以上创建channel NotificationManager notificationManager = (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE); if (notificationManager != null) { NotificationChannel channel; channel = new NotificationChannel("101", "服务", NotificationManager.IMPORTANCE_DEFAULT); channel.enableLights(true); channel.setLightColor(Color.GREEN); channel.setShowBadge(true); notificationManager.createNotificationChannel(channel); } } Context context = this; NotificationCompat.Builder mBuilder = new NotificationCompat.Builder(context, "101"); mBuilder.setShowWhen(true); mBuilder.setAutoCancel(false); int icon = context.getResources().getIdentifier("icon_pt", "mipmap", context.getPackageName()); mBuilder.setSmallIcon(icon); mBuilder.setContentText("正在运行"); int name = context.getResources().getIdentifier("app_name", "string", context.getPackageName()); mBuilder.setContentTitle(getString(name)); String action = getPackageName() + ".ACTION.CLICK"; Intent intent = new Intent(action); intent.setClassName(getPackageName(), getPackageName() + ".broadcast.MBroadcastReceiver"); PendingIntent contentIntent = PendingIntent.getBroadcast(context, 0, intent, PendingIntent.FLAG_UPDATE_CURRENT); mBuilder.setContentIntent(contentIntent); return mBuilder.build(); } private boolean isDestroy = false; @Override public void onDestroy() { super.onDestroy(); isDestroy = true; NetStateReceiver.unRegisterNetworkStateReceiver(this); if (subscriber != null && !subscriber.isUnsubscribed()) { subscriber.unsubscribe(); } if (receiveSub != null && !receiveSub.isUnsubscribed()) { receiveSub.unsubscribe(); } if (mConnection != null) { mConnection.close(); } } private void connectServer() { if (NetUtils.isNetworkAvailable(this)) { connectionSub(); connect(Config.SOCKET_SERVER, Config.SOCKET_PORT) .subscribeOn(Schedulers.io()) .subscribe(subscriber); } } private Subscriber subscriber; private void connectionSub() { if (subscriber != null) { if (!subscriber.isUnsubscribed()) { subscriber.unsubscribe(); } subscriber = null; } subscriber = new Subscriber() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); reConnect(); } @Override public void onNext(Boolean aBoolean) { Observable observable = receive(); if (observable != null) { if (receiveSub == null || receiveSub.isUnsubscribed()) { receiveSub = null; initReceiveOb(); } observable.subscribe(receiveSub); } } }; } private Subscriber receiveSub; private void initReceiveOb() { receiveSub = new Subscriber() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { reConnect(); } @Override public void onNext(ByteBuf byteBuf) { String s = byteBuf.toString(Charset.forName("utf-8")); boolean contains = s.contains("�"); if (contains) { //乱码了 s = byteBuf.toString(Charset.forName("gbk")); } ArrayList listeners = NettyClient.getInstance().getListeners(); if (listeners != null) { for (OnMessageListener listener : listeners) { if (listener != null) { listener.onMessageReceived(s); } } } } }; } private void reConnect() { if (isDestroy) { return; } //reconnect Observable.timer(3, TimeUnit.SECONDS).subscribe(new Action1() { @Override public void call(Long aLong) { if (mConnection != null) { mConnection.close(); mConnection = null; } Log.i("CoreService", "sendHeart_websocket长连接断开,重新连接"); connectServer(); } }); } ObservableConnection mConnection; public Observable connect(final String url, final int port) { return RxNetty.createTcpClient(url, port, new LengthFieldConfigurator()) .connect() .flatMap(new Func1, Observable>() { @Override public Observable call(ObservableConnection byteBufByteBufObservableConnection) { mConnection = byteBufByteBufObservableConnection; OnConnectListener listener = NettyClient.getInstance().getConnectListener(); if (listener != null) { listener.onConnected(); } return Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext(true); } }); } }); } public Observable receive() { if (mConnection != null) { return mConnection.getInput(); } return null; } public Observable send(String s) { if (mConnection != null) { return mConnection.writeBytesAndFlush(s.getBytes()); } return null; } }