package com.example.administrator.ximengjianyu.mqtt.push; import android.app.Activity; import android.app.AlarmManager; import android.app.PendingIntent; import android.app.Service; import android.content.Context; import android.content.Intent; import android.net.ConnectivityManager; import android.net.NetworkInfo; import android.os.Handler; import android.os.IBinder; import android.support.annotation.IntDef; import android.util.Log; import android.widget.Toast; import com.cm.utils.net.HttpUtils; import com.cm.utils.net.response.GsonResponseHandler; import com.example.administrator.ximengjianyu.activity.LoginActivity; import com.example.administrator.ximengjianyu.beans.GuanDuanPhoneBean; import com.example.administrator.ximengjianyu.beans.WebCTIStatus; import com.example.administrator.ximengjianyu.global.Constants; import com.example.administrator.ximengjianyu.global.UserInfo; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence; import org.json.JSONException; import org.json.JSONObject; import java.util.HashMap; import java.util.Locale; import java.util.Map; public class MQTTService extends Service implements MqttCallback { public Context mActivity; // public static String BASE_URL = "http://192.168.0.106:8083/"; //崔宝城测试IP // //测试IP // public static final String BROKER_URL = "tcp://192.168.0.106:1883"; //上线IP public static final String BROKER_URL = "tcp://192.168.133.150:1883"; public static final String clientId = "android-client"; public static final String TOPIC = "warnings"; public static final String USERNAME = "cmkj"; public static final String PASSWORD = "cmkj"; private MqttClient mqttClient; public static final String DEBUG_TAG = "MQService"; // Debug TAG private static final boolean MQTT_CLEAN_SESSION = true; // Start a clean session? private static final java.lang.String CLINET_ID = "liujun";// client id public static final String ACTION_START = DEBUG_TAG + ".START"; // Action to start public static final String ACTION_STOP = DEBUG_TAG + ".STOP"; // Action to stop public static final String ACTION_KEEPALIVE = DEBUG_TAG + ".KEEPALIVE"; // Action to keep alive used by alarm manager public static final String ACTION_RECONNECT = DEBUG_TAG + ".RECONNECT"; // Action to reconnect private MqttConnectOptions mOpts; // Connection Options private AlarmManager mAlarmManager; // Alarm manager to perform repeating tasks private ConnectivityManager mConnectivityManager; // To check for connectivity changes private boolean mStarted = false; // Is the Client started? private static final int MQTT_PORT = 1883; // Broker Port private static final String MQTT_BROKER = "192.168.80.238"; // Broker URL or IP Address private static final String MQTT_URL_FORMAT = "tcp://%s:%d"; public static final int MQTT_QOS_0 = 0;// URL Format normally don't change private static final int MQTT_KEEP_ALIVE_QOS = MQTT_QOS_0; // Default Keepalive QOS private MemoryPersistence mMemStore; // On Fail reverts to MemoryStore private static final int MQTT_KEEP_ALIVE = 1000; // KeepAlive Interval in MS private static final String MQTT_KEEP_ALIVE_MESSAGE = "hello word"; // Keep Alive message to send /** * 保持长连接的主题 */ private MqttTopic mKeepAliveTopic; // Instance Variable for Keepalive topic private Handler mConnHandler; private MqttConnectOptions ss; private String extensionNumber; private String token; /** * 服务的绑定 * * @param intent * @return */ @Override public IBinder onBind(Intent intent) { throw new UnsupportedOperationException("Not yet implemented"); } //=============================下面的是向外暴露的方法================== /** * 开始连接ActiveMQ服务器 * * @param ctx */ public static void actionStart(Context ctx) { Intent i = new Intent(ctx, MQTTService.class); i.setAction(ACTION_START); ctx.startService(i); } /** * 停止连接ActiveMQ服务器 * * @param ctx */ public static void actionStop(Context ctx) { Intent i = new Intent(ctx, MQTTService.class); i.setAction(ACTION_STOP); ctx.startService(i); } //=============================下面的是服务生命周期的方法================== /** * 服务初始化回调函数 */ @Override public void onCreate() { super.onCreate(); /**创建一个Handler*/ mConnHandler = new Handler(); /**连接的参数选项*/ mOpts = new MqttConnectOptions(); /**删除以前的Session*/ mOpts.setCleanSession(MQTT_CLEAN_SESSION); // Do not set keep alive interval on mOpts we keep track of it with alarm's /**定时器用来实现心跳*/ mAlarmManager = (AlarmManager) getSystemService(ALARM_SERVICE); /**管理网络连接*/ mConnectivityManager = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE); } /** * 服务器启动的回调的函数 * * @param intent * @param flags * @param startId * @return */ @Override public int onStartCommand(Intent intent, int flags, int startId) { super.onStartCommand(intent, flags, startId); String action = intent.getAction(); Log.i(DEBUG_TAG, "Received action of " + action); extensionNumber = UserInfo.getExtensionNumber(this); // Log.e("MQTTService",extensionNumber); token = UserInfo.getToken(this); if (action == null) { Log.i(DEBUG_TAG, "Starting service with no action\n Probably from a crash"); } else { if (action.equals(ACTION_START)) { Log.i(DEBUG_TAG, "Received ACTION_START"); start();/**开始连接*/ } else if (action.equals(ACTION_STOP)) { stop();/**停止连接*/ } else if (action.equals(ACTION_KEEPALIVE)) { // keepAlive();/**保持连接*/ } else if (action.equals(ACTION_RECONNECT)) { if (isNetworkAvailable()) { reconnectIfNecessary();/**重新连接*/ } } } return START_REDELIVER_INTENT; } //=============================下面的三个方法是实现了MqttCallback接口重写的方法================== /** * 连接失败 * * @param throwable */ @Override public void connectionLost(Throwable throwable) { stopKeepAlives(); mqttClient = null; /**检查网络是否可用*/ if (isNetworkAvailable()) { reconnectIfNecessary(); } } //=============================下面的方法是其它的方法================== /** * 检查网络是否可用 * to return the current connected state * * @return boolean true if we are connected false otherwise */ private boolean isNetworkAvailable() { NetworkInfo info = mConnectivityManager.getActiveNetworkInfo(); return (info == null) ? false : info.isConnected(); } private synchronized void start() { /**判断是否已经连接到服务器*/ if (mStarted) { Log.i(DEBUG_TAG, "Attempt to start while already started"); return; } /**判断是否还在保持长连接*/ if (hasScheduledKeepAlives()) { /**断开长连接*/ // stopKeepAlives(); } connect(); /**注册一个监听网路连接改变的方法*/ // registerReceiver(mConnectivityReceiver,new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)); } /** * Query's the AlarmManager to check if there is * a keep alive currently scheduled * * @return true if there is currently one scheduled false otherwise */ private synchronized boolean hasScheduledKeepAlives() { Intent i = new Intent(); i.setClass(this, MQTTService.class); i.setAction(ACTION_KEEPALIVE); PendingIntent pi = PendingIntent.getBroadcast(this, 0, i, PendingIntent.FLAG_NO_CREATE); return (pi != null) ? true : false; } /** * Connects to the broker with the appropriate datastore */ private synchronized void connect() { /**创建一个url*/ String url = String.format(Locale.US, MQTT_URL_FORMAT, MQTT_BROKER, MQTT_PORT); Log.i(DEBUG_TAG, "Connecting with URL: " + url); /**创建一个MQClient*/ try { mqttClient = new MqttClient(BROKER_URL, token, new MemoryPersistence()); } catch (MqttException e) { e.printStackTrace(); } new Thread() { @Override public void run() { super.run(); try { mOpts.setUserName(USERNAME); mOpts.setPassword(PASSWORD.toCharArray()); mqttClient.connect(mOpts); /**开始订阅*/ mqttClient.subscribe(extensionNumber); /**订阅的回调*/ mqttClient.setCallback(MQTTService.this); mStarted = true; // Service is now connected Log.i(DEBUG_TAG, "Successfully connected and subscribed starting keep alives"); /**保持长连接*/ startKeepAlives(); } catch (MqttException e) { e.printStackTrace(); } } }.start(); } /** * Attempts to stop the Mqtt client * as well as halting all keep alive messages queued * in the alarm manager */ private synchronized void stop() { if (!mStarted) { Log.i(DEBUG_TAG, "Attemtpign to stop connection that isn't running"); return; } if (mqttClient != null) { mConnHandler.post(new Runnable() { @Override public void run() { try { mqttClient.disconnect(0); } catch (MqttException ex) { ex.printStackTrace(); } mqttClient = null; mStarted = false; /**停止保持心跳,长连接*/ stopKeepAlives(); } }); } // unregisterReceiver(mConnectivityReceiver); } /** * 开始定时 * Schedules keep alives via a PendingIntent * in the Alarm Manager */ private void startKeepAlives() { Intent i = new Intent(); i.setClass(this, MQTTService.class); i.setAction(ACTION_KEEPALIVE); PendingIntent pi = PendingIntent.getService(this, 0, i, 0); mAlarmManager.setRepeating(AlarmManager.RTC_WAKEUP, System.currentTimeMillis() + MQTT_KEEP_ALIVE, MQTT_KEEP_ALIVE, pi); } /** * 取消定时 * Cancels the Pending Intent * in the alarm manager */ private void stopKeepAlives() { Intent i = new Intent(); i.setClass(this, MQTTService.class); i.setAction(ACTION_KEEPALIVE); PendingIntent pi = PendingIntent.getService(this, 0, i, 0); mAlarmManager.cancel(pi); } // /** // * 保持长连接 // * in the broker // */ // private synchronized void keepAlive() { // /**如果还在与ActiveMQ服务器连接*/ // if (isConnected()) { // try { //// startKeepAlives(); // /**发送一个心跳包,保持与服务器长连接*/ //// sendKeepAlive(); // return; // } catch (MqttConnectivityException ex) { // ex.printStackTrace(); // /**如果保持长连接出现异常,重新连接*/ // reconnectIfNecessary(); // } catch (MqttPersistenceException ex) { // ex.printStackTrace(); // stop(); // } catch (MqttException ex) { // ex.printStackTrace(); // stop(); // } // } // } /** * 检查连接情况,如果没有连接就自动连接 * and reconnects if it is required. */ private synchronized void reconnectIfNecessary() { if (mStarted && mqttClient == null) { connect(); } } /** * 检查与AcitiveMQ服务器的网络连接状态 * * @return true if its a match we are connected false if we aren't connected */ private boolean isConnected() { if (mStarted && mqttClient != null && !mqttClient.isConnected()) { Log.i(DEBUG_TAG, "Mismatch between what we think is connected and what is connected"); } if (mqttClient != null) { return (mStarted && mqttClient.isConnected()) ? true : false; } return false; } /** * 发送一个心跳包,保持长连接 * * @return MqttDeliveryToken specified token you can choose to wait for completion */ // private synchronized MqttDeliveryToken sendKeepAlive() // throws MqttConnectivityException, MqttPersistenceException, MqttException { // if (!isConnected()) // throw new MqttConnectivityException(); // // if (mKeepAliveTopic == null) { // mKeepAliveTopic = mqttClient.getTopic(extensionNumber); // } // MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE.getBytes()); // message.setQos(MQTT_KEEP_ALIVE_QOS); // /**发送一个心跳包给服务器,然后回调到:messageArrived 方法中*/ // return mKeepAliveTopic.publish(message); // } /** * MqttConnectivityException Exception class */ private class MqttConnectivityException extends Exception { private static final long serialVersionUID = -7385866796799469420L; } @Override public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception { String tag = new String(message.getPayload()); Log.e("tag", "messageArrived: " + new String(message.getPayload(), "utf-8")); MessageBean bean = parse(tag); Intent intent = new Intent(); intent.putExtra("Bean", bean); intent.setAction("com.jianyu.open"); sendBroadcast(intent); } @Override public void deliveryComplete(MqttDeliveryToken token) { } private MessageBean parse(String tag) throws JSONException { MessageBean bean = new MessageBean(); JSONObject obj = new JSONObject(tag); bean.setContent(obj.getString("content")); bean.setExtensionNumber(obj.getString("extensionNumber")); bean.setMessageFlag(obj.getString("messageFlag")); bean.setRemark(obj.getString("remark")); bean.setTitle(obj.getString("title")); bean.setSendTime(obj.getString("sendTime")); bean.setSextensionState(obj.getString("sextensionState")); return bean; } }