75142913在线留言
GO语言学习笔记11:chan管道与goroutine协程实现数万并发详解_Go语言_网络人

GO语言学习笔记11:chan管道与goroutine协程实现数万并发详解

Kwok 发表于:2020-10-12 14:12:56 点击:0 评论: 0

GO语言天生的并发支持是很多人学习的目的,也是理解的难点之一,需要充分理解了程序运行机制后才可以把协程并发玩起来。因为我们需要通过GO语言的管道概念来做为控制协程的运行。

一、了解进程与线程

学习协程并发之前需要了解一下系统的进程和线程,进程就是程序在操作的一次执行过程,GO语言goroutine编译后有1个主进程和数个协助进程(简称协程)分配到不同CPU去同时执行操作。主进程是系统进行资源分配和调度的基本单位,线程是进程的一个执行实例,是程序执行的最小单元,它比进程更小的能独立运行的基本单位。一个进程可以创建销毁多个线程,同一个进程中的多个线程可以并发执行。一个程序至少有一个进程,而一个进程至少有一个线程。

GO语言学习笔记11chan管道与goroutine协程实现数万并发详解

 二、关于并发和并行

多线程程序在单核上运行即:并发,多线程程序在多核心上同时运行即:并行。

GO语言学习笔记11chan管道与goroutine协程实现数万并发详解

因为是在一个CPU上执行,如果有10个任务线程,每个线程执行10毫秒(轮巡),实际上程序在微观上不执行的时候是被挂起状态的。

GO语言学习笔记11chan管道与goroutine协程实现数万并发详解

多核CPU之所以比单核快,就是因为并行的方式同时执行多个任务,虽然我们使用多任务操作系统,但多核CPU才真正的实现核心数的任务同时执行。多核心执行时也同时遵守单核的并发轮询方式处理。

三、GO语言的主进程与协程

在较早的程序设计中只有一个主进程来执行全部的操作,由于硬件发展速度远远了超过软件,后面开发的程序不能充分的利用新硬件的全部性能,所以软件开发者们又给主进程增加一个多线程来解决并发提升处理速度从而充分利用系统资源,虽然多线程是一个轻量级的进程但是多线程还是非常的吃资源并会大量占用内存和CPU。

GO语言的设计者们对传统的主进程+线程的方式进行了优化并提出了:主程序+协程来解决CPU全部同时工作(不会向上图一样出现空闲情况),GO的线程就变成更轻巧,而且在充分利用硬件的同时减少了资源浪费问题。GO语言使用了资源共享方式提升了效率,把每个线程都使用逻辑态隔开,所以GO语言中把这种解决方案称为协程,我们可以把协程理解为更轻量级的线程

package main

import "fmt"
import "time"

func WriteDate(n int, c chan int) {
	for i := 1; i <= n; i++ {
		fmt.Println("协助程写入数据:", i)
		c <- i * n //管道只有1个大小,只能1个1个写入
		//只要编译器发现有读取这个管道的操作这里就会阻塞直到读取后让出位置再写入,没有读管道会报错塞满

	}
	close(c) //当管道完成自己的任务后就应该关闭,否则使用FOR读到一个未关闭的管道也会报错
}

func ReadDate(c chan int, b chan bool) {
	for {
		v, ok := <-c
		if !ok {
			break
		}
		fmt.Println("协程读取数据:", v)
		time.Sleep(time.Second) //休眠1秒测试GO编译器的异步处理机制
	}
	b <- true //读取的任务完成了后给锁主进程的FOT一个通知,可以继续向下执行了
	close(b)  //通知已完成,管道需要关闭
}
func main() {
	var c = make(chan int, 1)  //管道大小设置为1,用于测试go编译器自动阻塞机制
	var b = make(chan bool, 1) //用于锁住主进程直到协程要做的事完成后才返回数据

	go WriteDate(10, c) //启动1个协程写入数据
	go ReadDate(c, b)   //再启动一个协程处理读数据

	for f := 1; ; f++ {
		fmt.Println("使用for无限循环锁定主进程!", f)
		_, ok := <-b
		if !ok {
			fmt.Println("数据读取完毕!解除锁定!", f)
			break
		}
	}
}

 上面通过goroutine协程处理数据的运行示意图:

GO语言学习笔记11chan管道与goroutine协程实现数万并发详解

在GO里面可以轻松的写数千个协程(for i:=0;i<5000;i++{go func()}),GO的协程有独立的栈空间,共享主进程的堆空间,调试由程序员控制,协程是轻量级的线程。而进程无法自主控制只能由操作系统去调度。

GO语言学习笔记11chan管道与goroutine协程实现数万并发详解

主程序是一个物理线程,直接运行在CPU上,主线程是重量级的,非常耗费CPU和内存资源。,而协程需要从主程序开启,是轻量级的线程,存在于一个逻辑态,由于共享和主线程的资源,所以资源相对消耗比较小。GOlang的协程机制是重要的特点,其它编程语言是基于线程,开启过多的线程资源耗费大,这里就突显出GO的并发优势。

goroutine调度是一个MPG模式:

M 代表着一个内核线程,也可以称为一个工作线程。goroutine就是跑在单个(并发)M之上或多个CPU就可以调度多个M线程(并行),M1,M2,Mn...

P 代表着(Processor)处理器 它的主要用途就是用来执行goroutine的,一个P代表执行一个Go代码片段的基础(可以理解为上下文环境),所以它也维护了一个可运行的goroutine队列,和自由的goroutine队列,里面存储了所有需要它来执行的goroutine。

G 代表着goroutine 实际的数据结构(就是你封装的那个方法),并维护goroutine 需要的栈、程序计数器以及它所在的M等信息。

Seched 代表着一个调度器 它维护有存储空闲的M队列和空闲的P队列,可运行的G队列,自由的G队列以及调度器的一些状态信息等。

GO程序在1.8前需要设置CPU个数,但新版本默认运行在多个CPU上,不再需要设置,GO内置了一个可读取物理CPU数据的方法作为知识点了解:

package main

import "fmt"
import "runtime"

func main() {
	num := runtime.NumCPU() - 1 //获取当前系统CPU数量并保留一个CPU给其它程序使用
	runtime.GOMAXPROCS(num) //设置当CPU数减去1个来运行这个程序
	fmt.Println("当前使用", num, "个CPU来运行")
	runtime.Goexit() //退出当前协程,与return效果是不一样的哦~Goexit会退出go关键的任务
	runtime.Gosched() //Gosched会让出CPU时间片给上一个协程使用。
}

 四、互斥锁VS通道

上面我们介绍了GO语言独有的协程处理高并发方式,这里做为一个需要了解的知识点来看看其它开发语言怎么处理多个进程的。通过对比你会发现GO在使用协程实现多并发的时候具有天生的优势。

当需要对某一个数据空间同时使用多线程操作的时候,会出现资源竞争情况,GO也提供了一个“互斥锁”sync.mutex方式来对需要处理的数据开启锁保护功能。 

import "sync" //引用互斥锁
var myMap = make(map[int]int, 10) //定义一个map,用于存储int数据
var lock sync.Mutex               //定义一把锁

func main() {
	for i := 0; i < 20; i++ {
		//通过for循环开启20个协程
		go func(n int) {
			res := 1
			for j := 0; j <= n; j++ {
				res = res + n
			}
			fmt.Printf("锁住myMap[%d],现在协程%d正在运行中...计算结果为:%dn", n, n, res)
			lock.Lock()    //锁住myMap不让其它协程数据写入,否则会报fatal error: concurrent map writes
			myMap[n] = res //把运行结果写入到myMap
			fmt.Printf("解锁myMap[%d]n", n)
			lock.Unlock() //将myMap解锁 其它可协程写入
		}(i)
	}
	time.Sleep(time.Second) //休眠1秒让协程运行
}
/*锁住myMap[5],现在协程5正在运行中...计算结果为:31
解锁myMap[5]
锁住myMap[19],现在协程19正在运行中...计算结果为:381
解锁myMap[19]
锁住myMap[12],现在协程12正在运行中...计算结果为:157
解锁myMap[12]
锁住myMap[14],现在协程14正在运行中...计算结果为:211
解锁myMap[14]
锁住myMap[16],现在协程16正在运行中...计算结果为:273
解锁myMap[16]
锁住myMap[18],现在协程18正在运行中...计算结果为:343
解锁myMap[18]
锁住myMap[0],现在协程0正在运行中...计算结果为:1
解锁myMap[0]
........*/

我们可以在编译运行的时候加入-race参数看到资竞争报告,现在CPU运算速度很快。理论上1秒足够跑完上面的运算,但这是不科学且无法预知的,所以底层可以出现资源争夺的情况。示意图可以根据上图脑补~

sync包提供了基本的同步基元,如互斥锁。除了Once和WaitGroup类型,大部分都是适用于低水平程序线程,高水平的同步使用channel通信更好一些。

根据上面的代码,我们使用通道重构代码如下:

var limint = 20 //限制协程数量
func Compute(c chan int, n int) {
	res := 1
	i := 0
	for ; i <= n; i++ {
		res = res + n
	}
	fmt.Printf("现在协程%d正在运行中...计算结果为:%dn", n, res)
	c <- res //把运行结果写入到通道c
	//什么时候关闭通道,是协程通信中最重要的,很多时候我们都使用一个专门的通道来做为开关
	//为了更好了解通道并发运行机制,本演示只使用1个通道进行数据存放与通信工作。
	if limint == i {
		close(c) //当完成了最后一个并发计算并写入后关闭通道c
	}
}

func main() {
	var intChan = make(chan int, 4) //定义一个长度为4的通道,用于存放int值
	fmt.Println("主进程开始了,下面将交给limint =", limint, "个协程处理")
	for i := 0; i <= limint; i++ {
		go Compute(intChan, i) //通过for循环开启20个协程
	}
	for {
		_, ok := <-intChan //不断去读取intChan管道里的数据并抛弃值,读取成功ok返回true,否则返回false,如果还有协助正在写入,就会挂起等新数据
		if ok == false {   //当close后,数据也读取完毕ok就会返回false
			fmt.Println("协程运行完毕!解除主进程锁定!")
			break
		} //管道阻塞的机制,intChan没有close,但读不到数据的时候会挂等新的数据写入或者close
	}

}

/*
主进程开始了,下面将交给limint = 20 个协程处理
现在协程20正在运行中...计算结果为:421
现在协程16正在运行中...计算结果为:273
......
协程运行完毕!解除主进程锁定!
*/

可以通过上面代码看到,我们只使用了一个通道写入数据并完成了通信工作,而且没有主进程休眠等协程的情况,一但协程完成了任务就会告诉主进程可以跳出for解除锁定了。

这里再次通过协程运行示意图演示:

 

GO语言学习笔记11chan管道与goroutine协程实现数万并发详解

五、通道定义与使用的几种方式:

上面的代码里我们使用了很多次的通道,通道我们可以抽象为一个自来水管,我们可以使用<-来读取或者写入数据,看下面通道的抽象示意图:

GO语言学习笔记11chan管道与goroutine协程实现数万并发详解

channel本质就是一个数据结构队列,数据是先进先出【FIFO:First In First Out】,通过上图可以看到多个goroutine协程访问channel时按进入的顺序依次读出,不需要加锁,所以channel(管道)本身就是线程安全的,不会发生资源竞争的问题。

GO做为一个强类型语言,管道channel在定义的时候需要指定要存入的数据类型。

var intChan chan int                     //定义一个int数据类型的空管道
fmt.Println("intChan =", intChan)        //intChan = 
intChan = make(chan int, 5)              //使用make给intChan分配空间
fmt.Println("make后的intChan =", intChan)  //make后的intChan = 0xc00007c000
var strChan = make(chan string, 10)      //定义string类型并同时分配10个空间
fmt.Println("strChan =", strChan)        //strChan = 0xc000052180
mapChan := make(chan map[int]string, 20) //使用类型推导定义一个map类型的管道
fmt.Printf("mapChan 的值=%v 本身的地址为:%pn", mapChan, &mapChan)
//mapChan 的值=0xc0000521e0 本身的地址为:0xc00000e030
//通过上面的值我们可以看到管道其实是一个引用类型

intChan <- 10          //在intChan管道里放入一个值
var getInt = <-intChan //读取intChan的值并赋给getInt
fmt.Printf("getInt的值=%v 本身的类型为:%T", getInt, getInt)
//getInt的值=10 本身的类型为:int

管理make时指定了空间(上面示意图管子的长度是固定的,0为无缓存通道),超过了会报错,所以管道最大的价格是流动性,边写边读,而GO语言的编译器有阻塞机制,当管道容量不够时就会等数据被取出去再写进去。一但管道被关闭数据只能读取,不能写入,当取完最后一个数据,结果就会返回false。

我们可以定义一个interface{}类型来存放任意数据类型,当我们需要取回interface{}类型的时候需要使用类型断言

type student struct {
	name string
	age  int
}

func main() {
	var interChan = make(chan interface{}, 1) //定义一个interface{}类型
	interChan <- student{"张三", 10}
	var stu = <-interChan //读取interChan 里的值
	fmt.Printf("getInt的值=%v 本身的类型为:%Tn", stu, stu)
	//上面在运行层面可以正常输出,但下面的编译就会出错,因为编辑器不知道stu是个啥类型
	fmt.Println("学生的姓名是:", stu.name) //报错:stu.name undefined (type interface {} is interface with no methods)

	stu2 := stu.(student)             //我们需要使用类型断言方式接收并判读数据类型
	fmt.Println("学生的姓名是:", stu2.name) //学生的姓名是: 张三
}

遍历管道里的所有数据与使用for时候的注意事项:

var intChan = make(chan int, 100)
for i := 0; i < 100; i++ {
	intChan <- i
}
close(intChan) //关闭intChan管道后只能读取数据
//下面演示一个错误的遍历管道方式
for i := 0; i < len(intChan); i++ {
	fmt.Println(<-intChan) //这里只会打印0~49共计50个数据
}
//管道读一个长度就少1个,所以上面代码将会读出错误的数据
//正确读取方式是使用for range或者将len(intChan)在for循环的上面先计算出来
fmt.Println("下将使用正确的方式读取管道剩余数据")
for v := range intChan {
	fmt.Println(v) //这里只会打印管道剩余所有数据50~99
}

//定义一个新管道测试for
var intChan2 = make(chan int, 100)
for i := 0; i < 100; i++ {
	intChan2 <- i
}
chanLen := len(intChan2) //先计算出管道长度
for i := 0; i < chanLen; i++ {
	fmt.Println(<-intChan2) //这里将正常打印所有数据0~99
}

不会失锁的管道遍历:

func main() {
	var intChan = make(chan int, 4)    //定义一个长度为4的通道,用于存放int值
	var strChan = make(chan string, 1) //定义一个长度为1的通道,用于存放string值
	for i := 0; i <= 8; i++ {
		go func(intChan chan int, i int) {
			intChan <- i + i
			strChan <- "这是第" + fmt.Sprintf("%d", i) + "协程"
		}(intChan, i)
	}
label:
	for {
		select {
		case v := <-intChan:
			fmt.Printf("计算结果为:%dn", v)
		case v := <-strChan:
			fmt.Println(v)
		default:
			fmt.Println("没有可取数据,协程运行完毕!解除主进程锁定!")
			return//我们也可以使用break label跳到指定标签位置
		}
	}

}

/*
计算结果为:6
这是第7协程
计算结果为:16
这是第8协程
这是第6协程
......
没有可取数据,协程运行完毕!解除主进程锁定!
*/

上面的代码里没有close,也不需要去想在哪个位置关闭管道比较合适,所以select可以解决我们什么时候关闭管道的烦恼和解决管道阻塞的问题。case可以同时对多个管道遍历,当case里所有的管理都没有数据的时候就使用default退出当前主程序,当然我们也可以break label跳到指定标签位置.

六、声明只读或者只写管道及应用场景

很多时候我们需要声明一个只读或者只写的管道来保证数据的安全性。在实际应用中我们可以定义接收消息的管道为只写发送消息的管道为只读。这样我们就不会在数据操作中丢失消息内容。默认情况下管理是双向的,可读可写,我们如果要定义为只读

//定义一个只读管道来处理发送消息
func sendMessage(s <-chan string) string {
	return <-s //函数里s管道只能读取
}

//定义一个只写管道来接收消息
func getMessage(g chan<- string, message string) {
	g <- message //函数里g管道只能写入
}
func main() {
	var messageChan = make(chan string, 10) //定义并make一个双向管道
	getMessage(messageChan, "这是接收到的一个消息")
	mes := sendMessage(messageChan) //获取一条messageChan管道里的消息。
	fmt.Print(mes)                  //这是接收到的一个消息
}

 如果我们并发的协程里某一个协程出现了错误将会导致整个程序崩溃,遇到这样的情况我们需要使用recover在函数里捕获panic并进行处理,这样即使这个协程发生了问题,但是主线程不会受影响,可以继续执行。

 

除非注明,网络人的文章均为原创,转载请以链接形式标明本文地址:http://www.neter8.com/go/84.html
标签:GO协程并发管道 Kwok最后编辑于:2020-10-26 10:26:37
0
感谢打赏!

《GO语言学习笔记11:chan管道与goroutine协程实现数万并发详解》的网友评论(0)

本站推荐阅读

热门点击文章