- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
0 S6 Y2 l* q2 H7 _ - require_once __DIR__ . '/Workerman/Autoloader.php';7 A9 N+ k; j7 I: U4 n# a: c
5 Z" D/ v; S7 |9 j: `- $worker = new Worker();
, z% C# ~: s+ Q, n1 Y - // 4个进程
; L. S5 ]& q4 c1 d - $worker->count = 4;
6 U; @6 ~7 W# m2 V- Y0 @4 i - // 每个进程启动后在当前进程新增一个Worker监听( B+ x6 T1 B( G2 [6 A% e' M( T
- $worker->onWorkerStart = function($worker)0 s! \# i- m: s3 Z
- {
3 y" d' H f2 F$ Q( y& V3 S y ` - /**1 M+ n0 H) P8 x) Q" v ?
- * 4个进程启动的时候都创建2016端口的Worker
\. h! W1 a6 Y) I - * 当执行到worker->listen()时会报Address already in use错误
* |+ o- `2 S+ \, C# R' @ - * 如果worker->count=1则不会报错
4 H& x# [& w' |" E& M - */
) u/ t. t$ T! G( U8 K" J - $inner_worker = new Worker('http://0.0.0.0:2016');" L a, ^& P/ s" J
- $inner_worker->onMessage = 'on_message';
1 v$ m, b a( l0 j5 |/ W8 d0 ^ - // 执行监听。这里会报Address already in use错误4 m- Z; [8 P$ i) d8 \0 S* X. `
- $inner_worker->listen(); ^; Q2 T/ I6 q/ {
- };
/ O& ]& B E* L" [+ U! X" O4 m
, M$ u ~* A+ u1 ?, a- $worker->onMessage = 'on_message';6 _' Q' A* }7 t3 y- I0 j' Q
! @/ ]# R$ X3 U6 k- function on_message($connection, $data)2 ^9 b- k2 c. k% F6 A% O
- {
3 N: B/ v' f9 X" Q( y1 G) w3 b - $connection->send("hello\n");
0 n$ t& E* z O. n$ T$ i/ ` - }
8 L& {8 G+ H: s2 [9 r( m( ^9 r - : n4 q8 Y2 P2 u+ \* R# k8 }
- // 运行worker- K0 ?. S: z- K! [
- Worker::runAll();
; t% z2 c1 S" o5 n - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:- S$ |( j# a8 ? E* A2 x$ s. s/ C
- 4 j. T/ q8 v4 e" s1 w
- use Workerman\Worker;
" \! |: ]7 X8 \ x4 w. Z4 C - require_once './Workerman/Autoloader.php';
7 n6 R( c- G) a$ j8 b" A3 c' U - 0 K5 \( I6 K0 w- x$ x' _
- $worker = new Worker('text://0.0.0.0:2015');0 D4 A9 E8 s5 A( p q2 F" y
- // 4个进程5 H% S( X( F L, ?
- $worker->count = 4;7 x# c+ L' k& p2 g5 ]
- // 每个进程启动后在当前进程新增一个Worker监听% @/ U8 h0 D2 f7 A" |9 ~
- $worker->onWorkerStart = function($worker)
' ~/ v! ~4 a: V2 d - {2 n% i8 [8 f+ t2 Q- Z3 R, G
- $inner_worker = new Worker('http://0.0.0.0:2016');
, G7 z$ q. p% q8 m) Z h0 ? - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
0 E; h' G5 i% n6 k+ ~2 I8 z8 D - $inner_worker->reusePort = true;9 n" z+ o4 d+ Z" S: w2 s
- $inner_worker->onMessage = 'on_message';
1 Q, M% E: k& r# @% b" y - // 执行监听。正常监听不会报错
1 S/ }5 Y/ X v+ ]+ r" L - $inner_worker->listen();
, l a0 W" H+ r/ c3 a - };: g w. U- L3 H% @* p3 c& h
- " d, G! w4 |( Y) S; C8 E" A; T
- $worker->onMessage = 'on_message';( L8 D2 \' e( e5 t V
# Y, |" I# b/ `1 S2 ^# h8 b- function on_message($connection, $data)- O! f& g) Z) [
- {' d) q' P% \( a- B- h. Q
- $connection->send("hello\n");
9 [. n* W8 g. n; J9 [/ l# [ - }
% z" u/ Q- W3 E! \. D; t
( E+ _4 y' B/ B3 K' \- // 运行worker/ L7 E& Z- k! z% X4 O
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php- `) M/ L! t) \& ^, G9 `# V
- use Workerman\Worker;
$ M$ k* B8 a9 q; S) o% B$ Y, G1 J - require_once './Workerman/Autoloader.php';
; n1 c/ ~+ L' z/ | - // 初始化一个worker容器,监听1234端口
1 q9 q9 a3 o- H- M) ?4 W - $worker = new Worker('websocket://0.0.0.0:1234');
q2 H+ v" w) u- h; a9 f. v8 `
& G8 w" b9 r" n) @7 X- /*
8 m" n8 C, v6 l, D - * 注意这里进程数必须设置为1,否则会报端口占用错误2 @6 l) T2 P5 n
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)7 T8 ~ t, @! O X
- */
- K7 k+ M/ n$ X( ]& l" v3 _# L. b# K3 A - $worker->count = 1;
) c7 K8 K4 H( [- y+ }+ P! p - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口) F7 t; O* G5 e
- $worker->onWorkerStart = function($worker)
$ s& U& s: A% p1 l, j - {
; C0 E: ]5 M `: { - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
# U6 r/ ^" b6 d1 h2 C - $inner_text_worker = new Worker('text://0.0.0.0:5678');' g! D. X. y7 J( u1 e8 J Q9 l
- $inner_text_worker->onMessage = function($connection, $buffer)
) Z: `9 ^4 H* _0 ^6 j( C - {$ _- k- g% @+ j# n/ v
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据, T7 ]: R' D8 D9 K. m
- $data = json_decode($buffer, true);
" y6 } f+ _( Y. T$ U" E - $uid = $data['uid'];
1 y' ?9 q j, R' z3 v7 s& g4 Q - // 通过workerman,向uid的页面推送数据0 c2 F8 B, F1 V! C, X1 k
- $ret = sendMessageByUid($uid, $buffer);1 H" G2 Z- S1 m9 I" c
- // 返回推送结果
- j& y- l! A& L, _ - $connection->send($ret ? 'ok' : 'fail');
4 W% x" a! p$ T1 D0 H; T: | - };
# I3 { B; M7 H/ M - // ## 执行监听 ##
! w Y5 k* j7 o4 C ?7 B; w- A% A3 y - $inner_text_worker->listen();
7 l/ \0 c& K- S% Y3 l - };: P8 k+ v. ~5 T8 M+ F2 e3 M: Y
- // 新增加一个属性,用来保存uid到connection的映射
- q* t6 O3 K+ G9 z6 j8 o0 c5 x - $worker->uidConnections = array();- {2 |! ~7 x* w- n4 x# x
- // 当有客户端发来消息时执行的回调函数9 r' Q3 b' h1 ^: \+ |/ \; J9 m
- $worker->onMessage = function($connection, $data)
$ k) S3 J* h1 E# R1 s1 _0 l" u3 V - {5 X$ }- v u- S$ e6 s. N( m* A
- global $worker;2 t8 l$ n) h$ Q* `; G6 @
- // 判断当前客户端是否已经验证,既是否设置了uid7 a/ r+ P# y: e: k6 V2 I
- if(!isset($connection->uid))
: _* l) g" r$ U2 u% E - {, d& E* b' q, m& r; g& p
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
: `; Y; `8 e k' C/ r - $connection->uid = $data;
8 ]$ q2 K& C, O6 q3 G) c/ S* d4 c - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
) h1 R& H8 Y& T" h6 Q - * 实现针对特定uid推送数据
V3 K- b% Y( f7 K5 a" p4 o - */: z/ p( \$ a8 q! w$ e) S
- $worker->uidConnections[$connection->uid] = $connection;
1 B6 O- V" k8 H* k - return;
4 T0 k' A# o0 T& Y - }) B$ t8 @+ |% U# a" S U
- };9 R! R8 W) q& ?! `
6 K3 o9 _( o+ e- R& l" o: I& E1 @/ I- // 当有客户端连接断开时5 k, v+ a7 \' P& s$ L$ ~
- $worker->onClose = function($connection)9 a* {0 m! B4 L+ V `6 ^
- {0 @2 E9 {1 z& Q
- global $worker;
0 [8 e3 o% I9 V5 x1 B! G - if(isset($connection->uid))2 h3 U8 y$ n4 K$ E+ Q
- {. Z4 [- {9 m/ @! B' Q9 V; a9 G
- // 连接断开时删除映射
& A/ @% u% s! R4 I3 w1 o% Q! ] E, I - unset($worker->uidConnections[$connection->uid]);! H: v; \9 `% c9 B- D' V
- }. Q- [9 [ X h* R# o/ j
- };
2 v5 B, R! t1 ~ C
- t* Q$ K: O' }) |$ w- // 向所有验证的用户推送数据
6 O( ~& K/ d6 U6 J1 ^$ x8 r9 K - function broadcast($message), h3 R" a" x" R; V
- {
2 R& v6 s( J# e9 A8 p - global $worker; V5 r+ D; K9 r
- foreach($worker->uidConnections as $connection)! N5 l" b5 c6 c7 o( }8 w. o. e2 A9 {" W
- {
( B7 ]: S) Z4 T9 n$ ^4 C - $connection->send($message);
E8 V, G. d# l - }; u, f( U- y _ @0 U
- }- s/ _6 @8 g6 v; t, Z
O* J) E0 l& \! ? q. y% s- // 针对uid推送数据; e0 @3 r& h2 _+ m( X
- function sendMessageByUid($uid, $message)
* M' b% [+ q, Z) Z U - {
3 t9 Q& ^+ o+ N! E1 _ - global $worker;
( Q# K$ p+ ]/ w9 O - if(isset($worker->uidConnections[$uid]))
1 {' x* x+ x3 k6 W7 y - {
* N }# `8 v7 E4 h9 b4 g - $connection = $worker->uidConnections[$uid];
6 m+ S0 R2 \- z# K- S - $connection->send($message);
5 R! _) m; ]) l% I0 ^- q U$ ? - return true;
4 }+ d% `2 M% F! }7 v- W - }
6 K& x8 N. a0 u2 t0 v7 ~ - return false;: U' V# v/ g1 y! N5 D; X
- }
7 \7 X( ?3 M3 D, i0 w
g; Q2 a- e. u% k$ ]9 [# C* e; Z- // 运行所有的worker
' k" H( \% p6 d* d - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');8 \' y7 R1 ~3 W& }8 j9 y
- ws.onopen = function(){
# U2 ]- ?0 q( c( v9 i - var uid = 'uid1';
2 E; K/ _/ h, z; [) o - ws.send(uid);8 N; Y' E3 Z. O* o; q4 w
- };- h3 A2 y) T0 [# M$ K
- ws.onmessage = function(e){/ D1 | y' h& S0 L
- alert(e.data);
2 m9 T; l: G2 | - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
t9 e9 {& y P" V8 R - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
7 T2 Z0 s+ `1 ^ - // 推送的数据,包含uid字段,表示是给这个uid推送* z `; f& ~8 q9 G
- $data = array('uid'=>'uid1', 'percent'=>'88%');
( B; ^. a1 w8 ]9 X3 M' W - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符0 n, c+ S& p- w+ k$ M
- fwrite($client, json_encode($data)."\n");
/ E3 r3 L2 o& M8 i, o3 [ - // 读取推送结果, d5 q2 C; J4 L& r
- echo fread($client, 8192);
复制代码 - z/ v* y* z- f( u8 q' t
& e/ H, }1 c. c/ P0 I3 v |