MQTT协议特点
MQTT是一个由IBM主导开发的物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。它的核心设计思想是开源、可靠、轻巧、简单,具有以下主要的几项特性:
- 非常小的通信开销(最小的消息大小为 2 字节);
- 支持各种流行编程语言(包括C,Java,Ruby,Python 等等)且易于使用的客户端;
- 支持发布 / 预定模型,简化应用程序的开发;
- 提供三种不同消息传递等级,让消息能按需到达目的地,适应在不稳定工作的网络传输需求
- 应项目要求使用MQTT协议实现客户端与服务端通信。Android端使用MqttAndroidClient实现MQTT通信:MqttAndroidClient
MqttAndroidClient配置
在项目根目录下的build.gradle中添加:
repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
}
在app目录下的build.gradle中添加:
dependencies {
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
//eventbus 各个组件之间进行通信
implementation 'org.greenrobot:eventbus:3.1.1'
}
在AndroidManifest.xml中添加:
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.READ_PHONE_STATE" />
<uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_WIFI_STATE" />
在AndroidManifest.xml中添加:
<service
android:name=".MQTTService"
android:enabled="true"
android:exported="true" />
<service android:name="org.eclipse.paho.android.service.MqttService" />
MQTTService.java
import android.app.Service;
import android.content.Intent;
import android.os.Binder;
import android.os.IBinder;
import android.util.Log;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.greenrobot.eventbus.EventBus;
public class MQTTService extends Service {
private static MqttAndroidClient mqttAndroidClient;//声明
final String serverUri = "tcp://43.139.79.48:1883";//mqtt地址
String clientId = "hello";//客户端的ID
public static String subscriptionTopic = "test";//订阅的主题
public static String publishTopic = "test";//发布的主题
public MQTTService() {
}
//与mainactivit之间通信的
class MyBinder extends Binder{
//activity与service之间进行通信的桥梁
}
@Override
public IBinder onBind(Intent intent) {
// TODO: Return the communication channel to the service.
Log.i("MQTTService","mqttService start");
mqttinit();//mqtt的初始化
return new MyBinder();//返回Binder的实例
}
//mqtt的初始化
private void mqttinit() {
mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId);
//设置回调函数
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
//连接完成时,自动调用
@Override
public void connectComplete(boolean reconnect, String serverURI) {
Log.i("TempMqtt", "connectComplete: 连接成功");
//订阅主题
subscribeToTopic();
}
//连接失去的调用
@Override
public void connectionLost(Throwable cause) {
Log.i("TempMqtt", "connectComplete: 断开连接");
}
//接收到消息的时候
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.e("TempMqtt", "connectComplete: 收到消息"+message.toString());
String str=message.toString();
//接受的消息,发布到eventbus,post
EventBus.getDefault().post(new MessageEvent(str));
}
//发送消息完成
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
//完成 mqtt连接
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();//定义一个mqttConnectOptions
mqttConnectOptions.setAutomaticReconnect(true);//自动重连
mqttConnectOptions.setCleanSession(false);//不清除会话
try {
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
//Toast.makeText("MQTTService", "连接成功", Toast.LENGTH_SHORT).show();
Log.i("MQTTService", "onSuccess: 连接成功");
//进行主题的订阅
subscribeToTopic();
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
// Toast.makeText(MainActivity.this, "连接失败", Toast.LENGTH_SHORT).show();
Log.i("MQTTService", "onSuccess: 连接失败");
}
});
} catch (MqttException ex){
ex.printStackTrace();
}
}
//订阅主题
public static void subscribeToTopic() {
try {
mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
//订阅成功的时候,就会调用
@Override
public void onSuccess(IMqttToken asyncActionToken) {
//Toast.makeText(MainActivity.this, "订阅成功", Toast.LENGTH_SHORT).show();
Log.i("TempMqtt", "onSuccess: 订阅成功");
}
//订阅失败的时候,就会调用
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
//Toast.makeText(MainActivity.this, "订阅失败", Toast.LENGTH_SHORT).show();
Log.i("TempMqtt", "onSuccess: 订阅失败");
}
});
} catch (MqttException ex){
Log.i("" +
"", "onSuccess: Exception whilst subscribing");
ex.printStackTrace();
}
}
//发布主题
public static void publishToTopic(String message){
try {
//参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
mqttAndroidClient.publish(publishTopic, message.getBytes(), 0, false);
Log.i("TempMqtt", "publishToTopic: 发送消息为"+message);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
MessageEvent.java
//然后,我们定义一个事件的封装对象。在程序内部就使用该对象作为通信的信息:
public class MessageEvent {
public final String message;
public static MessageEvent getInstance(String message) {
return new MessageEvent(message);
}
MessageEvent(String message) {
this.message = message;
}
}
MainActivity.java
import android.content.ComponentName;
import android.content.Context;
import android.content.Intent;
import android.content.ServiceConnection;
import android.os.Bundle;
import android.os.IBinder;
import android.view.View;
import android.widget.TextView;
import android.widget.Toast;
import androidx.annotation.NonNull;
import androidx.appcompat.app.AppCompatActivity;
import org.greenrobot.eventbus.EventBus;
import org.greenrobot.eventbus.Subscribe;
import org.greenrobot.eventbus.ThreadMode;
import org.json.JSONException;
import org.json.JSONObject;
public class MainActivity extends AppCompatActivity {
private Intent mintent;
private TextView q;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
mintent=new Intent(this,MQTTService.class);
EventBus.getDefault().register(this);//注册EventBus
bindService(mintent,connection, Context.BIND_AUTO_CREATE);//绑定方式启动服务
q= (TextView) findViewById(R.id.aa);
q.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
//点击文字发布消息
MQTTService.publishToTopic("hahaah");
}
});
}
//当接收到MQTT服务器发送过来的数据的时候,就会调用此函数
// This method will be called when a MessageEvent is posted (in the UI thread for Toast)
//表示事件处理函数的线程在主线程(UI)线程,因此在这里不能进行耗时操作。
@Subscribe(threadMode = ThreadMode.MAIN,sticky = true)
public void onReceiveMsg(@NonNull MessageEvent event){
String str=event.message;
//在这里,可以进行UI界面的更新
UiShow(str);
}
private void UiShow(String str ){
try {
JSONObject jsonMessage=new JSONObject(str);
String temp= jsonMessage.getString("msg");
//Toast.makeText(this, temp, Toast.LENGTH_SHORT).show();
q.setText(temp);
} catch (JSONException e) {
e.printStackTrace();
}
}
//新建了一个 ServiceConnection 实例
private ServiceConnection connection=new ServiceConnection() {
//当service与activity连接的时候,调用此方法
@Override
public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
}
//当service与activity失去连接的时候,调用此方法
@Override
public void onServiceDisconnected(ComponentName componentName) {
}
};
@Override
protected void onDestroy() {
EventBus.getDefault().unregister(this);//注销、
unbindService(connection);//解除绑定
stopService(mintent);//停止mqtt的服务
super.onDestroy();
}
}
在activity_main.xml中,给TextView赋值id
android:id="@+id/aa"
评论 (1)