package cloud.deergod.educationassessmentsystemapp.controller.component.tcp
import android.util.Log
import cloud.deergod.educationassessmentsystemapp.controller.component.event_bus.ApplicationScopeViewModelProvider
import cloud.deergod.educationassessmentsystemapp.controller.component.event_bus.EventMiddleware
import cloud.deergod.educationassessmentsystemapp.model.data.communication.ServerReflectData
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import java.net.InetSocketAddress
import java.net.Socket
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.coroutines.CoroutineContext
import kotlin.experimental.xor
// 定义一个类来继承协程使整个类拥有协程的上下文.
internal class TransparentTcp(private val currentUserId : String, private val serverIp : String,
private val serverPort : Int, private val lastUpdateTime : String) : CoroutineScope {
companion object{
private const val TAG : String = "TransparentTcp"
/**
* TCP 超时连接30s 因为TCP有重发机制,所以时间稍微久一点也可以
*/
private const val TCP_CONNECTION_TIMEOUT = 30 * 10000
private const val TCP_CONNECTION_TIME = 50000L // TCP重连
private const val TCP_MESSAGE_TIME = 300000L // tcp检测心跳
}
private val heartToken : ByteArray = "¶".toByteArray(Charsets.UTF_8);
private val job = Job()
override val coroutineContext: CoroutineContext = IO + job
private var tempTime = 0L // 临时保存时间
private var isStarted = false
private var socket: Socket? = null
private lateinit var socketIn: InputStream
private lateinit var socketOut: OutputStream
/**
* 开始工作协程
*/
private lateinit var startLauncher: Job
/**
* 心跳协程
*/
private var heartLauncher: Job? = null
/**
* 接收信息协程
*/
private var readLauncher: Job? = null
/**
* 监视反馈信息协程
*/
private var clientFeedbackLauncher : Job? = null
/**
* 发送信息协程
*/
private var sendLauncher: Job? = null
private val eventHub = ApplicationScopeViewModelProvider
.getApplicationScopeViewModel(EventMiddleware::class.java)
suspend fun startUp()
{
/*val coroutineExceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
throwable.printStackTrace() }
*/
if(!isStarted) {
startLauncher = withContext(coroutineContext) {
async {
startConnection()
}.await()
}
startLauncher.start()
}
}
fun stopWork(){
if(isStarted) {
closeJob()
try {
startLauncher.cancel()
} catch (ex: Exception) {
ex.printStackTrace()
}
}
}
private fun closeJob()
{
try {
clientFeedbackLauncher?.cancel()
clientFeedbackLauncher = null
}catch (ex : Exception){
ex.printStackTrace()
}
try {
heartLauncher?.cancel()
heartLauncher = null
}catch (ex : Exception){
ex.printStackTrace()
}
try {
sendLauncher?.cancel()
sendLauncher = null
}catch (ex : Exception){
ex.printStackTrace()
}
try {
readLauncher?.cancel()
readLauncher = null
}catch (ex : Exception){
ex.printStackTrace()
}
//startLauncher.cancel()
}
private suspend fun socketClose()
{
withContext(IO) {
eventHub.changeServerConnectState(-1)
}
try {
socketOut.close()
}catch (e : Exception){
e.printStackTrace()
}
try {
socketIn.close()
}catch (e : Exception){
e.printStackTrace()
}
try {
socket?.close()
}catch (e : Exception){
e.printStackTrace()
}
socket = null
}
private fun startConnection() =
launch(coroutineContext, CoroutineStart.LAZY) {
// 创建socket
while (true) {
try {
messageQueue.clear()
socket = Socket()
isStarted = true
Log.i(TAG, "连接tcp")
// 创建 socket连接地址
val inetSocketAddress = InetSocketAddress(serverIp, serverPort)
// 等待tcp连接成功,连接失败直接抛异常
socket?.connect(inetSocketAddress, 30000)
//tempTime = System.currentTimeMillis()
socketIn = socket?.getInputStream()!!
socketOut = socket?.getOutputStream()!!
readLauncher = readTcpData()
sendLauncher = sendMessageTcp()
//开始发送认证信息
//socketOut.write("identifier龘${currentUserId}龘${lastUpdateTime}".toByteArray(Charsets.UTF_8))
//socketOut.flush()
//delay(100)
messageQueueOffer("identifier龘${currentUserId}龘${lastUpdateTime}".toByteArray(Charsets.UTF_8));
heartLauncher = heartPackage()
clientFeedbackLauncher = clientFeedback()
// 先暂停5s,如果接收到tcp的消息(因为接收协程已经开启[readLaun]),则会继续往下阻塞
delay(5000)
// 阻塞当前的协程继续循环,等待要是出意外或者超时则会重新遍历继续重连tcp
async {
while (true) {
// 重复检测当前的时间是否超时
delay(1000)
// 接收到任何信息就代表连接成功,接收到信息之后会改变当前的时间
if (TCP_CONNECTION_TIMEOUT < System.currentTimeMillis() - tempTime) {
//messageCallBack.transparentConnectionTimeOut()
//job.cancelAndJoin()
closeJob()
socketClose()
// 跳出阻塞 再次循环外层的协程,重新连接tcp
return@async
}
}
}.await()
} catch (e: Exception) {
// 重新连接的时候将要对当前的协程阻塞进行关闭
e.printStackTrace()
closeJob()
socketClose()
Log.i(TAG, "start: 重新连接")
isStarted = false
delay(TCP_CONNECTION_TIME)
}
}
}
// 定义一个心跳包
private fun heartPackage(): Job = launch {
while (true) {
delay(TCP_MESSAGE_TIME)
messageQueueOffer(heartToken)
// 不要管里面的代码,只需要知道这是每隔一段时间发送一个心跳命令
// 在终端未认证之前要定时发送连接终端命令
/*if (!TransparentMessageManager.terminalAllow) {
TransparentMessageManager.sendConnectRequest()
} else {
TransparentMessageManager.sendLogRequest()
}*/
}
}
// 定义一个发送消息的队列
private var messageQueue = ConcurrentLinkedQueue<ByteArray>()
private fun messageQueueOffer(tcpMsg: ByteArray) {
synchronized(messageQueue) {
messageQueue.offer(tcpMsg)
}
}
/*
runBlocking {
val squares = produceTest()
squares.consumeEach { println("receive :$it") }
println("Done!")
}
flow{
emit(1)
}.flowOn(Dispatchers.IO).collect{ value ->
Log.d(TAG, "最终转换后的值 :${value}")
}
*/
private fun sendMessageTcp() = launch {
while (true) {
try {
// 定一个死循环 持续对这个队列进行读取
val message = messageQueue.poll()
// 读取的空消息 就暂停那么100毫秒,跳过此次循环在去遍历
if (message == null) {
delay(100)
continue
}
// 读取到消息之后直接发送
socketOut.write(message)
socketOut.flush()
// 防止粘包
delay(100)
} catch (e: IOException) {
e.printStackTrace()
}
}
}
//clientFeedbackLauncher
//(coroutineContext, CoroutineStart.LAZY)
private fun clientFeedback() : Job = launch{
launch {
eventHub.sendInstantMessage.collect{ message ->
Log.i(TAG, "收到客户消息${message.messageBody}")
}
}
launch {
eventHub.updateFeedback.collect{ updateStub ->
if(updateStub.isSuccessSave) {
messageQueueOffer(updateStub.dataId.toByteArray(Charsets.UTF_8))
}
}
}
}
// 定一个接收消息的队列,粘包处理我做的是按照字节处理.
private val receiveQueue = ConcurrentLinkedQueue<Byte>()
private fun readTcpData(): Job = launch {
launch {
var count = 0
var number: Int
val lengthByteArray = ByteArray(8)
while (true) {
number = 0
// 获取到消息长度 这样写是因为在if中写async.await()阻塞不了当前的协程,原因未知
// 换个方式写可以阻塞当前的协程也就是阻塞当前的while循环
(count == 8).isSuccess {
async {
// 取出来字节数组钱4为 加解密密钥
val seedBytes = lengthByteArray.copyOfRange(0, 4)
// 取出(copy)字节后四位,之后要取出数据的长度
val messageBytes = lengthByteArray.copyOfRange(4, 8)
val length = xorLengthEncode(messageBytes, seedBytes)
// 创建一个获取数据长度的字节数组 并且
val dataByteArray = ByteArray(length + 8)
while (number < length + 8) {
val poll = receiveQueue.poll() ?: continue
dataByteArray[number] = poll
if (number == length + 7) {
// 拿到当前的密钥,和当前的数据解析当前的数据
launch {
val serverData = pickupContent(dataByteArray, seedBytes)
eventHub.publishServerMessage(serverData)
}
}
number++
}
count = 0
}.await()
}
val poll = receiveQueue.poll()
if (poll == null) {
delay(100)
continue
}
lengthByteArray[count] = poll
count++
}
}
try {
val b = ByteArray(1024)
var length = 0
while (length != -1) {
length = socketIn.read(b)
if (length != -1) {
val tempByte = ByteArray(length)
tempByte.forEachIndexed { index, byte ->
receiveQueue.offer(b[index])
}
tempTime = System.currentTimeMillis()
launch {
eventHub.changeServerConnectState(1)
}
}
}
} catch (e: IOException) {
e.printStackTrace()
}
}
private inline fun Boolean.isSuccess(function: () -> Unit) {
if (this)
function()
}
//工具函数
private fun xorLengthEncode(messageBytes : ByteArray, seedBytes : ByteArray) : Int
{
//val intBuffer = ByteBuffer.wrap(messageBytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer()
//intBuffer1.get()
val lengthInt = ByteBuffer.wrap(messageBytes).order(ByteOrder.LITTLE_ENDIAN).int
val seedInt = ByteBuffer.wrap(seedBytes).order(ByteOrder.LITTLE_ENDIAN).int
return lengthInt xor seedInt
}
private fun pickupContent( messageBytes : ByteArray, seedBytes : ByteArray)
: ServerReflectData
{
//提取nPackageType,nPackageIndex
val seedInt = ByteBuffer.wrap(seedBytes).order(ByteOrder.LITTLE_ENDIAN).int //intBuffer1.get()
val packageIndexByteArray = ByteArray(4)
val packageTypeByteArray = ByteArray(4)
val finalContent = ByteArray(messageBytes.size - 8)
val finalContentOutput = ByteArray(messageBytes.size - 8)
System.arraycopy(messageBytes, 0, packageIndexByteArray, 0, 4)
System.arraycopy(messageBytes, 4, packageTypeByteArray, 0, 4)
System.arraycopy(messageBytes, 8, finalContent, 0, messageBytes.size - 8)
val currentIndex = ByteBuffer.wrap(packageIndexByteArray).order(ByteOrder.LITTLE_ENDIAN).int xor
seedInt
val currentType = ByteBuffer.wrap(packageTypeByteArray).order(ByteOrder.LITTLE_ENDIAN).int xor
seedInt
var tempByte : Byte
for((index, b) in finalContent.withIndex()){
tempByte = finalContent[index] xor seedBytes[0]
tempByte = tempByte xor seedBytes[1]
tempByte = tempByte xor seedBytes[2]
tempByte = tempByte xor seedBytes[3]
finalContentOutput[index] = tempByte
}
return ServerReflectData(currentIndex, currentType, String(finalContentOutput, Charsets.UTF_8))
}
}
//老师,上面就是我的代码,请指导
我的完整代码,请指导
- 写回答
- 好问题 0 提建议
- 追加酬金
- 关注问题
- 邀请回答
-
4条回答 默认 最新
- Kwan的解忧杂货铺@新空间代码工作室 2024-02-29 20:05关注
知更鸟1982 晚上好🌙🌙🌙
本答案参考通义千问你的代码是使用Kotlin编写的一个TCP通信类,它负责与服务器进行透明TCP连接,并处理心跳、发送消息和接收消息。看起来你想要解决的问题可能包括:
-
协程管理:
- 启动和停止协程:
startUp()
方法用于启动连接,stopWork()
用于关闭所有协程。 - 协程取消: 当需要停止时,使用
closeJob()
方法取消所有协程。
- 启动和停止协程:
-
连接管理:
- 重试连接: 使用
while
循环和delay
处理连接失败后的重试。 - 连接状态更新:
eventHub.changeServerConnectState()
用于更新连接状态。
- 重试连接: 使用
-
心跳和消息发送:
- 心跳包:
heartPackage()
每段时间发送心跳包。 - 消息发送:
sendMessageTcp()
持续读取messageQueue
并发送消息。
- 心跳包:
-
消息接收和解码:
- 粘包处理:
readTcpData()
使用ConcurrentLinkedQueue
处理粘包问题。 - 解码数据:
pickupContent()
函数用于解密接收到的数据。
- 粘包处理:
-
异常处理:
- 异常打印: 在
catch
块中打印异常堆栈。
- 异常打印: 在
对于代码的优化建议,你可以考虑以下几点:
-
协程管理:
- 可以考虑使用
CoroutineScope
的launch
函数,而不是手动创建Job
,这样更简洁。 - 使用
try-catch
结构捕获异常并统一处理,避免代码重复。
- 可以考虑使用
-
连接重试:
- 可以将重试逻辑封装成一个独立的方法,提高代码可读性。
-
线程安全:
- 对
messageQueue
和receiveQueue
的操作应确保线程安全,比如使用synchronized
或ReentrantLock
。
- 对
-
协程取消:
- 如果
startLauncher
、heartLauncher
等协程在某个条件满足时应该被取消,可以考虑在对应的协程内部添加job.cancelWhen()
。
- 如果
-
日志和错误处理:
- 添加更详细的日志,以便于调试,如
socketClose()
方法中的e.printStackTrace()
。
- 添加更详细的日志,以便于调试,如
下面是修改后的简化版代码,针对上述建议进行了优化:
import kotlinx.coroutines.* import java.io.IOException import java.net.InetSocketAddress import java.net.Socket import java.nio.ByteBuffer import java.nio.ByteOrder import java.util.concurrent.ConcurrentLinkedQueue class TransparentTcp(private val currentUserId: String, private val serverIp: String, private val serverPort: Int, private val lastUpdateTime: String) : CoroutineScope { // ...其他不变的部分... private val heartToken = "¶".toByteArray(Charsets.UTF_8) private val connectionRetryInterval = 5000L private val messageQueue = ConcurrentLinkedQueue<ByteArray>() override val coroutineContext: CoroutineContext = Dispatchers.IO + Job() // ...其他不变的方法... suspend fun startUp() { startLauncher = launch { startConnection() } startLauncher.start() } fun stopWork() { cancelAllLaunchers() } // ...其他不变的方法... private suspend fun startConnection() { repeatWhile { isNotConnected() } { try { // ...连接代码... readLauncher = readTcpData() sendLauncher = sendMessageTcp() heartLauncher = heartPackage() clientFeedbackLauncher = clientFeedback() // ...其他不变的部分... } catch (e: Exception) { // ...异常处理... delay(connectionRetryInterval) } } } // ...其他不变的方法... private suspend fun readTcpData() { // ...处理粘包的代码... } // ...其他不变的方法... // 添加一个方法来取消所有协程 private suspend fun cancelAllLaunchers() { heartLauncher?.cancel() sendLauncher?.cancel() readLauncher?.cancel() clientFeedbackLauncher?.cancel() } // ...其他不变的方法... }
注意:以上代码只是一部分简化,具体实现可能还需要根据你的项目需求进行调整。
解决 1无用 -
悬赏问题
- ¥50 有没有适合匹配类似图中的运动规律的图像处理算法
- ¥15 dnat基础问题,本机发出,别人返回的包,不能命中
- ¥15 请各位帮我看看是哪里出了问题
- ¥15 vs2019的js智能提示
- ¥15 关于#开发语言#的问题:FDTD建模问题图中代码没有报错,但是模型却变透明了
- ¥15 uniapp的h5项目写一个抽奖动画
- ¥15 hadoop中启动hive报错如下怎么解决
- ¥15 如何优化QWebEngineView 加载url的速度
- ¥15 关于#hadoop#的问题,请各位专家解答!
- ¥15 如何批量抓取网站信息