之前做过一个深交所股票数据的接存储软件,消息的协议是这样。

协议文档在这  https://wenku.baidu.com/view/d102cd0b4a73f242336c1eb91a37f111f1850df2.html

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

记一次golang的实践 随笔 第1张

由于socket接受来的数据会出现粘包或者半包的情况。所以要进行拆包处理代码是这样的,但是无论网络状态是否稳定都会出现丢包的情况。

记一次golang的实践 随笔 第2张
  private RecMessage TryReadAmessage(ref byte[] bytes)
        {
            if (bytes.Length < 12)
                return null;
            var msgType = NetworkBitConverter.ToInt32(bytes, 0);
            if (messageTypes.Contains(msgType))
            {

                var msgLength = NetworkBitConverter.ToInt32(bytes, 4);
                if (msgLength >= 0 && msgLength < 5000)
                {
                    if (bytes.Length < (msgLength + 12))
                        return null;

                    var outputByte = new byte[12 + msgLength];
                    Array.Copy(bytes, outputByte, msgLength + 12);
                    bytes = bytes.Removebytes(msgLength + 12);
                    var chkbyte = new byte[4];
                    Array.Copy(outputByte, msgLength + 8, chkbyte, 0, 4);
                    if (!GetBytesFomObj.CheckSum(chkbyte, outputByte)) return null;

                    return new RecMessage
                    {
                        MessageType = msgType,
                        MessageLenght = msgLength,
                        MessageContent = outputByte.Removebytes(8)
                    };
                }
                else
                    bytes = bytes.Removebytes(4);
            }
            else
                bytes = bytes.Removebytes(4);
            return null;
        }
View Code

后来用了supersocket框架拆包,效果不错,基本没有发现错误包的情况。看代码

记一次golang的实践 随笔 第4张
 1  public class ClientBinaryFilter : FixedHeaderReceiveFilter<RecMessage>
 2     {
 3         /// <summary>
 4         /// 所有消息类型
 5         /// </summary>
 6         private readonly int[] _messageTypes = new int[]
 7         {   1,
 8             3,
 9             8,
10             390013,
11             390019,
12             390012,
13             309011,
14             309111,
15             300111,
16             300192,
17             300191,
18             300611,
19             300592,
20             300792,
21             300591,
22             300791,
23             306311,
24             390090,
25             390095,
26             390094,
27             390093
28         };
29         public ClientBinaryFilter() : base(8)
30         {
31 
32         }
33         private int key;
34         private int datalength;
35         protected override int GetBodyLengthFromHeader(IBufferStream bufferStream, int length)
36         {
37             key = bufferStream.ReadInt32(false);
38             datalength = bufferStream.ReadInt32(false);
39             return datalength + 4;
40         }
41 
42         public override RecMessage ResolvePackage(IBufferStream bufferStream)
43         {
44             var buffer = new byte[datalength + 4];
45             bufferStream.Skip(8).Read(buffer, 0, datalength + 4);
46             return new RecMessage
47             {
48                 MessageType = key,
49                 MessageLenght = datalength,
50                 MessageContent = buffer
51             };
52         }
53     }
View Code

后来学习了golang语言,感觉非常适合做这种实时性很高的软件。由于逻辑不是特别复杂,没有用到所有的golang的特性,知识点有:net(socket),goruntime,channel(缓冲),数组切片slice,互斥锁sync.Mutex,redigo(redis),ffjson(序列化与反序列化)

核心的代码在这里

记一次golang的实践 随笔 第6张
 1 //socket接受函数
 2 func handMsgChannel(con net.Conn) {
 3     con.Write(SendLogOn())
 4     go sendHeartBt(con)
 5     var unCompleteBytes []byte
 6     for {
 7         //startTime := time.Now()
 8         buffer := make([]byte, 4096000)
 9         readLength, err := con.Read(buffer)
10         CheckError(err, con)
11         if readLength > 0 {
12             var readBuf []byte = buffer[0:readLength]
13             //获取上次为解析的半包并装载到此次循环中解析
14             if len(unCompleteBytes) != 0 {
15                 readBuf = lib.BytesCopy(unCompleteBytes, readBuf)
16             }
17             readMessageChannel(&readBuf) //拆包分解,并把半包数据留在下次循环处理
18             unCompleteBytes = readBuf
19             //endTime := time.Since(startTime)
20             //fmt.Println("读取并数据处理时间: ", endTime)
21         }
22     }
23 }
24 
25 //利用channel读取并传入channel
26 func readMessageChannel(input *[]byte) {
27     //var output []BaseMsgModel
28     for {
29         byteArray := *input
30         //头+长度+校验 最小12个字节,不足跳出
31         if len(byteArray) < 12 {
32             break
33         }
34         mt := lib.BytesToInt32(byteArray[0:4])
35         //获取数据头
36         if lib.Contain(mt, MsgTypes) {
37             ml := lib.BytesToInt32(byteArray[4:8])
38             //数据长度
39             if ml >= 0 && ml < 5000 {
40                 if len(byteArray) < 12+int(ml) {
41                     break
42                 }
43                 //获取报文content
44                 mc := byteArray[8 : 12+int(ml)]
45                 sum := lib.BytesToInt32(mc[len(mc)-4 : len(mc)])
46                 checksum := lib.CheckSum(byteArray[0 : 8+int(ml)])
47                 //检查校验
48                 if checksum == sum {
49                     model := &BaseMsgModel{
50                         mt,
51                         ml,
52                         mc,
53                     }
54                     chmsg <- model
55                     *input = byteArray[12+int(ml):]
56                 } else {
57                     *input = byteArray[12+int(ml):]
58                     fmt.Println("错误数据")
59                 }
60             } else {
61                 //错误长度移除
62                 *input = byteArray[8:]
63                 fmt.Println("错误长度")
64             }
65         } else {
66             //错误头部移除
67             *input = byteArray[4:]
68             fmt.Println("错误头")
69         }
70     }
71 }
72 
73 //分发存储channel中数据
74 func translateMsgChannel() {
75     for {
76         item := <-chmsg
77         //fmt.Print("数据类型", msgs.MsgType, "数据长度", msgs.MsgLength, msgs.MsgContent, "\r\n")
78         switch item.MsgType {
79         case 300111:
80             msg.Save300111(item.MsgContent)
81             systemParams.Params.Lock()
82             systemParams.msg300111Length++
83             systemParams.Params.Unlock()
84             break
85         case 309011:
86             msg.Save309011(item.MsgContent)
87             systemParams.Params.Lock()
88             systemParams.msg309011Length++
89             systemParams.Params.Unlock()
90             break
91         default:
92             break
93         }
94         systemParams.Params.Lock()
95         systemParams.RecLength++
96         systemParams.Params.Unlock()
97     }
98 }
View Code

经测试后不禁感叹golang真是黑科技,.net的代码内存会达到50M以上,CPU10%左右(I5 4核4线程)

golang呢10M左右 CPU 0.5%~1%,golang。。。牛啊!

代码在这  https://gitee.com/siming.liu/golang_stock

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄