知更鸟1982 2024-02-29 20:04 采纳率: 0%
浏览 6

我的完整代码,请指导


package cloud.deergod.educationassessmentsystemapp.controller.component.tcp

import android.util.Log
import cloud.deergod.educationassessmentsystemapp.controller.component.event_bus.ApplicationScopeViewModelProvider
import cloud.deergod.educationassessmentsystemapp.controller.component.event_bus.EventMiddleware
import cloud.deergod.educationassessmentsystemapp.model.data.communication.ServerReflectData
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers.IO
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import java.net.InetSocketAddress
import java.net.Socket
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.coroutines.CoroutineContext
import kotlin.experimental.xor


// 定义一个类来继承协程使整个类拥有协程的上下文.
internal class TransparentTcp(private val currentUserId : String, private val serverIp : String,
                              private val serverPort : Int, private val lastUpdateTime : String) : CoroutineScope {
    companion object{
        private const val TAG : String = "TransparentTcp"
        /**
         * TCP 超时连接30s 因为TCP有重发机制,所以时间稍微久一点也可以
         */
        private const val TCP_CONNECTION_TIMEOUT = 30 * 10000

        private const val TCP_CONNECTION_TIME = 50000L // TCP重连
        private const val TCP_MESSAGE_TIME = 300000L // tcp检测心跳
    }

    private val heartToken : ByteArray = "¶".toByteArray(Charsets.UTF_8);
    private val job = Job()
    override val coroutineContext: CoroutineContext = IO + job

    private var tempTime = 0L // 临时保存时间

    private var isStarted = false

    private var socket: Socket? = null

    private lateinit var socketIn: InputStream

    private lateinit var socketOut: OutputStream

    /**
     * 开始工作协程
     */
    private lateinit var startLauncher: Job

    /**
     * 心跳协程
     */
    private var heartLauncher: Job? = null

    /**
     * 接收信息协程
     */
    private var readLauncher: Job? = null

    /**
     * 监视反馈信息协程
     */
    private var clientFeedbackLauncher : Job? = null


    /**
     * 发送信息协程
     */
    private var sendLauncher: Job? = null

    private val eventHub = ApplicationScopeViewModelProvider
        .getApplicationScopeViewModel(EventMiddleware::class.java)


    suspend fun startUp()
    {
        /*val coroutineExceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
            throwable.printStackTrace() }
        */
        if(!isStarted) {
            startLauncher = withContext(coroutineContext) {
                async {
                    startConnection()
                }.await()
            }
            startLauncher.start()
        }
    }

    fun stopWork(){
        if(isStarted) {
            closeJob()
            try {
                startLauncher.cancel()
            } catch (ex: Exception) {
                ex.printStackTrace()
            }
        }
    }

    private fun closeJob()
    {
        try {
            clientFeedbackLauncher?.cancel()
            clientFeedbackLauncher = null
        }catch (ex : Exception){
            ex.printStackTrace()
        }
        try {
            heartLauncher?.cancel()
            heartLauncher = null
        }catch (ex : Exception){
            ex.printStackTrace()
        }
        try {
            sendLauncher?.cancel()
            sendLauncher = null
        }catch (ex : Exception){
            ex.printStackTrace()
        }
        try {
            readLauncher?.cancel()
            readLauncher = null
        }catch (ex : Exception){
            ex.printStackTrace()
        }
        //startLauncher.cancel()
    }

    private suspend fun socketClose()
    {
        withContext(IO) {
            eventHub.changeServerConnectState(-1)
        }


        try {
            socketOut.close()
        }catch (e : Exception){
            e.printStackTrace()
        }
        try {
            socketIn.close()
        }catch (e : Exception){
            e.printStackTrace()
        }
        try {
            socket?.close()
        }catch (e : Exception){
            e.printStackTrace()
        }
        socket = null
    }


    private fun startConnection() =
        launch(coroutineContext, CoroutineStart.LAZY) {
            // 创建socket
            while (true) {
                try {
                    messageQueue.clear()
                    socket = Socket()
                    isStarted = true
                    Log.i(TAG, "连接tcp")
                    // 创建 socket连接地址
                    val inetSocketAddress = InetSocketAddress(serverIp, serverPort)
                    // 等待tcp连接成功,连接失败直接抛异常
                    socket?.connect(inetSocketAddress, 30000)

                    //tempTime = System.currentTimeMillis()

                    socketIn = socket?.getInputStream()!!
                    socketOut = socket?.getOutputStream()!!

                    readLauncher = readTcpData()
                    sendLauncher = sendMessageTcp()

                    //开始发送认证信息
                    //socketOut.write("identifier龘${currentUserId}龘${lastUpdateTime}".toByteArray(Charsets.UTF_8))
                    //socketOut.flush()
                    //delay(100)


                    messageQueueOffer("identifier龘${currentUserId}${lastUpdateTime}".toByteArray(Charsets.UTF_8));

                    heartLauncher = heartPackage()
                    clientFeedbackLauncher = clientFeedback()

                    // 先暂停5s,如果接收到tcp的消息(因为接收协程已经开启[readLaun]),则会继续往下阻塞
                    delay(5000)

                    // 阻塞当前的协程继续循环,等待要是出意外或者超时则会重新遍历继续重连tcp
                    async {
                        while (true) {
                            // 重复检测当前的时间是否超时
                            delay(1000)
                            // 接收到任何信息就代表连接成功,接收到信息之后会改变当前的时间
                            if (TCP_CONNECTION_TIMEOUT < System.currentTimeMillis() - tempTime) {
                                //messageCallBack.transparentConnectionTimeOut()
                                //job.cancelAndJoin()
                                closeJob()
                                socketClose()
                                // 跳出阻塞 再次循环外层的协程,重新连接tcp
                                return@async
                            }
                        }
                    }.await()
                } catch (e: Exception) {
                    // 重新连接的时候将要对当前的协程阻塞进行关闭
                    e.printStackTrace()
                    closeJob()
                    socketClose()
                    Log.i(TAG, "start: 重新连接")
                    isStarted = false
                    delay(TCP_CONNECTION_TIME)
                }
            }
        }

    // 定义一个心跳包
    private fun heartPackage(): Job = launch {
        while (true) {
            delay(TCP_MESSAGE_TIME)
            messageQueueOffer(heartToken)
            // 不要管里面的代码,只需要知道这是每隔一段时间发送一个心跳命令
            // 在终端未认证之前要定时发送连接终端命令
            /*if (!TransparentMessageManager.terminalAllow) {
                TransparentMessageManager.sendConnectRequest()
            } else {
                TransparentMessageManager.sendLogRequest()
            }*/
        }
    }

    // 定义一个发送消息的队列
    private var messageQueue = ConcurrentLinkedQueue<ByteArray>()

    private fun messageQueueOffer(tcpMsg: ByteArray) {
        synchronized(messageQueue) {
            messageQueue.offer(tcpMsg)
        }
    }

    /*
        runBlocking {
        val squares = produceTest()
        squares.consumeEach { println("receive :$it") }
        println("Done!")
    }
        flow{
            emit(1)
        }.flowOn(Dispatchers.IO).collect{ value ->
            Log.d(TAG, "最终转换后的值 :${value}")
        }
     */

    private fun sendMessageTcp() = launch {
        while (true) {
            try {
                // 定一个死循环 持续对这个队列进行读取
                val message = messageQueue.poll()
                // 读取的空消息 就暂停那么100毫秒,跳过此次循环在去遍历
                if (message == null) {
                    delay(100)
                    continue
                }
                // 读取到消息之后直接发送
                socketOut.write(message)
                socketOut.flush()

                // 防止粘包
                delay(100)


            } catch (e: IOException) {
                e.printStackTrace()
            }
        }
    }

    //clientFeedbackLauncher

    //(coroutineContext, CoroutineStart.LAZY)
    private fun clientFeedback() : Job = launch{
        launch {
            eventHub.sendInstantMessage.collect{ message ->
                Log.i(TAG, "收到客户消息${message.messageBody}")
            }
        }
        launch {
            eventHub.updateFeedback.collect{ updateStub ->
                if(updateStub.isSuccessSave) {
                    messageQueueOffer(updateStub.dataId.toByteArray(Charsets.UTF_8))
                }
            }
        }
    }

    // 定一个接收消息的队列,粘包处理我做的是按照字节处理.
    private val receiveQueue = ConcurrentLinkedQueue<Byte>()
    private fun readTcpData(): Job = launch {
        launch {
            var count = 0
            var number: Int
            val lengthByteArray = ByteArray(8)
            while (true) {
                number = 0
                // 获取到消息长度 这样写是因为在if中写async.await()阻塞不了当前的协程,原因未知
                // 换个方式写可以阻塞当前的协程也就是阻塞当前的while循环
                (count == 8).isSuccess {
                    async {
                        // 取出来字节数组钱4为 加解密密钥
                        val seedBytes = lengthByteArray.copyOfRange(0, 4)
                        // 取出(copy)字节后四位,之后要取出数据的长度
                        val messageBytes = lengthByteArray.copyOfRange(4, 8)
                        val length = xorLengthEncode(messageBytes, seedBytes)
                        // 创建一个获取数据长度的字节数组 并且
                        val dataByteArray = ByteArray(length + 8)
                        while (number < length + 8) {
                            val poll = receiveQueue.poll() ?: continue
                            dataByteArray[number] = poll
                            if (number == length + 7) {
                                // 拿到当前的密钥,和当前的数据解析当前的数据
                                launch {
                                    val serverData = pickupContent(dataByteArray, seedBytes)
                                    eventHub.publishServerMessage(serverData)
                                }
                            }
                            number++
                        }
                        count = 0
                    }.await()
                }
                val poll = receiveQueue.poll()
                if (poll == null) {
                    delay(100)
                    continue
                }
                lengthByteArray[count] = poll
                count++
            }
        }

        try {
            val b = ByteArray(1024)
            var length = 0
            while (length != -1) {
                length = socketIn.read(b)
                if (length != -1) {
                    val tempByte = ByteArray(length)
                    tempByte.forEachIndexed { index, byte ->
                        receiveQueue.offer(b[index])
                    }
                    tempTime = System.currentTimeMillis()
                    launch {
                        eventHub.changeServerConnectState(1)
                    }
                }
            }
        } catch (e: IOException) {
            e.printStackTrace()
        }
    }

    private inline fun Boolean.isSuccess(function: () -> Unit) {
        if (this)
            function()
    }


    //工具函数

    private fun xorLengthEncode(messageBytes : ByteArray, seedBytes : ByteArray) : Int
    {
        //val intBuffer = ByteBuffer.wrap(messageBytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer()
        //intBuffer1.get()
        val lengthInt = ByteBuffer.wrap(messageBytes).order(ByteOrder.LITTLE_ENDIAN).int
        val seedInt = ByteBuffer.wrap(seedBytes).order(ByteOrder.LITTLE_ENDIAN).int

        return lengthInt xor seedInt
    }

    private fun pickupContent( messageBytes : ByteArray, seedBytes : ByteArray)
     : ServerReflectData
    {
        //提取nPackageType,nPackageIndex
        val seedInt = ByteBuffer.wrap(seedBytes).order(ByteOrder.LITTLE_ENDIAN).int //intBuffer1.get()
        val packageIndexByteArray = ByteArray(4)
        val packageTypeByteArray = ByteArray(4)
        val finalContent = ByteArray(messageBytes.size - 8)
        val finalContentOutput = ByteArray(messageBytes.size - 8)
        System.arraycopy(messageBytes, 0, packageIndexByteArray, 0, 4)
        System.arraycopy(messageBytes, 4, packageTypeByteArray, 0, 4)
        System.arraycopy(messageBytes, 8, finalContent, 0, messageBytes.size - 8)
        val currentIndex = ByteBuffer.wrap(packageIndexByteArray).order(ByteOrder.LITTLE_ENDIAN).int xor
                seedInt
        val currentType = ByteBuffer.wrap(packageTypeByteArray).order(ByteOrder.LITTLE_ENDIAN).int xor
                seedInt
        var tempByte : Byte
        for((index, b) in finalContent.withIndex()){
            tempByte = finalContent[index] xor seedBytes[0]
            tempByte = tempByte xor seedBytes[1]
            tempByte = tempByte xor seedBytes[2]
            tempByte = tempByte xor seedBytes[3]
            finalContentOutput[index] = tempByte
        }

        return ServerReflectData(currentIndex, currentType, String(finalContentOutput, Charsets.UTF_8))
    }
}

//老师,上面就是我的代码,请指导
  • 写回答

4条回答 默认 最新

  • 关注

    知更鸟1982 晚上好🌙🌙🌙
    本答案参考通义千问

    你的代码是使用Kotlin编写的一个TCP通信类,它负责与服务器进行透明TCP连接,并处理心跳、发送消息和接收消息。看起来你想要解决的问题可能包括:

    1. 协程管理:

      • 启动和停止协程: startUp()方法用于启动连接,stopWork()用于关闭所有协程。
      • 协程取消: 当需要停止时,使用closeJob()方法取消所有协程。
    2. 连接管理:

      • 重试连接: 使用while循环和delay处理连接失败后的重试。
      • 连接状态更新: eventHub.changeServerConnectState()用于更新连接状态。
    3. 心跳和消息发送:

      • 心跳包: heartPackage()每段时间发送心跳包。
      • 消息发送: sendMessageTcp()持续读取messageQueue并发送消息。
    4. 消息接收和解码:

      • 粘包处理: readTcpData()使用ConcurrentLinkedQueue处理粘包问题。
      • 解码数据: pickupContent()函数用于解密接收到的数据。
    5. 异常处理:

      • 异常打印: 在catch块中打印异常堆栈。

    对于代码的优化建议,你可以考虑以下几点:

    • 协程管理:

      • 可以考虑使用CoroutineScopelaunch函数,而不是手动创建Job,这样更简洁。
      • 使用try-catch结构捕获异常并统一处理,避免代码重复。
    • 连接重试:

      • 可以将重试逻辑封装成一个独立的方法,提高代码可读性。
    • 线程安全:

      • messageQueuereceiveQueue的操作应确保线程安全,比如使用synchronizedReentrantLock
    • 协程取消:

      • 如果startLauncherheartLauncher等协程在某个条件满足时应该被取消,可以考虑在对应的协程内部添加job.cancelWhen()
    • 日志和错误处理:

      • 添加更详细的日志,以便于调试,如socketClose()方法中的e.printStackTrace()

    下面是修改后的简化版代码,针对上述建议进行了优化:

    import kotlinx.coroutines.*
    import java.io.IOException
    import java.net.InetSocketAddress
    import java.net.Socket
    import java.nio.ByteBuffer
    import java.nio.ByteOrder
    import java.util.concurrent.ConcurrentLinkedQueue
    
    class TransparentTcp(private val currentUserId: String, private val serverIp: String, private val serverPort: Int, private val lastUpdateTime: String) : CoroutineScope {
        // ...其他不变的部分...
    
        private val heartToken = "¶".toByteArray(Charsets.UTF_8)
        private val connectionRetryInterval = 5000L
        private val messageQueue = ConcurrentLinkedQueue<ByteArray>()
    
        override val coroutineContext: CoroutineContext = Dispatchers.IO + Job()
    
        // ...其他不变的方法...
    
        suspend fun startUp() {
            startLauncher = launch {
                startConnection()
            }
            startLauncher.start()
        }
    
        fun stopWork() {
            cancelAllLaunchers()
        }
    
        // ...其他不变的方法...
    
        private suspend fun startConnection() {
            repeatWhile { isNotConnected() } {
                try {
                    // ...连接代码...
                    readLauncher = readTcpData()
                    sendLauncher = sendMessageTcp()
                    heartLauncher = heartPackage()
                    clientFeedbackLauncher = clientFeedback()
    
                    // ...其他不变的部分...
                } catch (e: Exception) {
                    // ...异常处理...
                    delay(connectionRetryInterval)
                }
            }
        }
    
        // ...其他不变的方法...
    
        private suspend fun readTcpData() {
            // ...处理粘包的代码...
        }
    
        // ...其他不变的方法...
    
        // 添加一个方法来取消所有协程
        private suspend fun cancelAllLaunchers() {
            heartLauncher?.cancel()
            sendLauncher?.cancel()
            readLauncher?.cancel()
            clientFeedbackLauncher?.cancel()
        }
    
        // ...其他不变的方法...
    }
    

    注意:以上代码只是一部分简化,具体实现可能还需要根据你的项目需求进行调整。

    评论

报告相同问题?

问题事件

  • 创建了问题 2月29日

悬赏问题

  • ¥50 有没有适合匹配类似图中的运动规律的图像处理算法
  • ¥15 dnat基础问题,本机发出,别人返回的包,不能命中
  • ¥15 请各位帮我看看是哪里出了问题
  • ¥15 vs2019的js智能提示
  • ¥15 关于#开发语言#的问题:FDTD建模问题图中代码没有报错,但是模型却变透明了
  • ¥15 uniapp的h5项目写一个抽奖动画
  • ¥15 hadoop中启动hive报错如下怎么解决
  • ¥15 如何优化QWebEngineView 加载url的速度
  • ¥15 关于#hadoop#的问题,请各位专家解答!
  • ¥15 如何批量抓取网站信息