- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;0 [6 h% w m1 V+ |3 T4 q3 W
- require_once __DIR__ . '/Workerman/Autoloader.php';
+ H& Z1 {8 j! o' O) q - " u9 }7 d. |% L6 N- ?8 B" ^' P8 r
- $worker = new Worker();4 O( k, `3 S o
- // 4个进程, v9 B! ]# n1 r* |, S
- $worker->count = 4;
: v2 j2 ~6 }- \7 d8 o5 ]/ o& Q9 F - // 每个进程启动后在当前进程新增一个Worker监听
. v' O% ]$ G% W b - $worker->onWorkerStart = function($worker)" s1 O% l$ j I! i! o* Z$ d
- {5 H$ g! x. N# @* A4 @8 e$ t1 X
- /**/ {) _+ r- c; T0 r
- * 4个进程启动的时候都创建2016端口的Worker* S% D q4 {8 E" b$ y7 O x
- * 当执行到worker->listen()时会报Address already in use错误6 y; x; `, T2 E8 u' ~# Q
- * 如果worker->count=1则不会报错
9 ]& R+ X" `6 `! m! b: h1 h" g& M - */1 J, L2 Q) R( c" ^0 }' y7 b0 K
- $inner_worker = new Worker('http://0.0.0.0:2016');' ?4 F2 m i- z: G& s) l+ u; v/ H
- $inner_worker->onMessage = 'on_message';2 x: H5 w1 v* e3 [( r
- // 执行监听。这里会报Address already in use错误
& F/ Z; X+ a' d7 k+ z' t - $inner_worker->listen();8 W8 b2 d3 H) q$ e5 D+ I- X2 x
- };
: G1 I9 F5 s& M, U) c& }" z$ q
! ?" \* S" D+ w) R, J9 J- $worker->onMessage = 'on_message';$ d7 l9 q0 T: l0 s4 l, l% `
! O( P7 I" X. f) @- c: C/ E2 S- function on_message($connection, $data)2 d$ Y; l) ?) s8 t
- {
# w3 J6 c K& t B X8 H - $connection->send("hello\n");
4 }& x! a$ ?: \. x' y2 ] - }
6 C2 @4 E8 \% |5 w$ o - 9 t( C6 E2 C5 K( k1 l
- // 运行worker
& w5 p( l. }0 _# w9 }% l+ h/ Y - Worker::runAll();4 [8 t8 r& I2 I
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
j% X2 D1 A3 L g - & m: X( Q* D- Z6 x. o
- use Workerman\Worker;/ G& X; s! @$ V0 A3 j8 C; h0 [- {; J
- require_once './Workerman/Autoloader.php';7 C# @# b( M3 F
2 f$ \( u8 r& z2 H j- $worker = new Worker('text://0.0.0.0:2015');
3 m4 W0 y) Z+ F: R - // 4个进程
, |. L" [& ], C( }- m. W - $worker->count = 4;. F0 S* K+ P! F$ b% H" H3 i
- // 每个进程启动后在当前进程新增一个Worker监听" S G2 _8 a0 [$ S! L* l4 s
- $worker->onWorkerStart = function($worker)
, m- y# U! X2 p$ ] - {, t9 ?0 [) Z+ D
- $inner_worker = new Worker('http://0.0.0.0:2016');
9 E3 P. }9 L5 c0 B& |9 a6 O3 S" D - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)6 J) B$ a8 `0 ~3 d: b( @& L
- $inner_worker->reusePort = true;$ @4 `! h+ C$ j; Y4 _% n( a \# m
- $inner_worker->onMessage = 'on_message';
' t g( Q) ~9 j - // 执行监听。正常监听不会报错
3 Z- I! H: a5 v& @7 W' ^, y - $inner_worker->listen();
0 r* ?9 a" C& @ ^) X: @( H - };# l7 t1 |& a8 @" `! x9 w
- 2 o; b' b) R ]5 i) x
- $worker->onMessage = 'on_message';
0 {$ Q4 \& D* v+ y - # _2 m8 H/ f# E; Y- c) r v3 l
- function on_message($connection, $data)8 P8 q h8 `4 S3 Y- d b4 v
- {3 v. H. s3 c* o, B9 j. @
- $connection->send("hello\n");
. S2 k' a' r( R, ~ - }
% I- T0 o b: ]. K* M - 8 m( V B2 Q3 Y, e% ~; G# x
- // 运行worker; B8 V* n; a) m& J
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
1 Y( _0 V6 k4 _: I8 ^4 W1 ] - use Workerman\Worker;
3 _; W' B) I! h7 ?9 k# l - require_once './Workerman/Autoloader.php';
( Z# S! D" l5 d- R0 A1 N1 ~0 { - // 初始化一个worker容器,监听1234端口$ ?0 I' ~0 h0 h( J
- $worker = new Worker('websocket://0.0.0.0:1234');
% Q T4 k+ J& z
9 a) H% \& i7 D! z0 C" o8 s7 e- /*" j) x* D) ?. H" @6 B$ S( m
- * 注意这里进程数必须设置为1,否则会报端口占用错误
, y& P4 w( C, o/ m) g - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
( d5 W* ^6 f0 d; \) q - */
1 C' m' J' T7 q, @8 p - $worker->count = 1;+ [; V5 `) o+ L h
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
! w8 O4 w0 [9 |4 T - $worker->onWorkerStart = function($worker)% ~, B" F6 |% y! p0 j# p
- {/ h' a1 q: _2 n) u
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
; o( J6 e3 V+ R( C& k- m - $inner_text_worker = new Worker('text://0.0.0.0:5678');" c y y5 w5 q% ?# E4 ~
- $inner_text_worker->onMessage = function($connection, $buffer)
& g) g; k& }# X( c9 D- R' |4 x - {/ H" k# @- f& W2 x8 X% j: y
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据& X$ ^9 r2 N% W7 P
- $data = json_decode($buffer, true);/ |4 H6 s# n( j
- $uid = $data['uid'];" i1 ^ d+ U& {' T
- // 通过workerman,向uid的页面推送数据
) y2 d3 @* V' h2 S8 W& R$ d - $ret = sendMessageByUid($uid, $buffer);
2 U+ h% H. f5 q - // 返回推送结果 F) ^1 Z; @- y4 f
- $connection->send($ret ? 'ok' : 'fail');% t, c* L# D" u8 {+ T- B
- };5 I. Z( D/ D3 l, D* ` R$ `
- // ## 执行监听 ##' D1 { p" K, ?
- $inner_text_worker->listen();, d- E1 ?, Y7 o4 J+ O# b/ |
- };/ i4 {2 p5 x) X5 P9 S
- // 新增加一个属性,用来保存uid到connection的映射0 g3 h- h f/ t4 h7 a" m4 U! _
- $worker->uidConnections = array();! i0 H a4 u# S
- // 当有客户端发来消息时执行的回调函数8 |7 {" v( _; @7 Z" t. O3 y
- $worker->onMessage = function($connection, $data)
! h& q1 ~, }9 S+ k) Q; ]9 Z( t - {
' O! G2 Q9 ]) N0 y0 B - global $worker;9 M" \ A4 N/ l0 m/ R
- // 判断当前客户端是否已经验证,既是否设置了uid
3 S; Q3 |3 F/ Q7 `# Z& E - if(!isset($connection->uid))- b! h; x+ \8 Q3 J
- {7 o/ ^" H1 L) R# A$ F# g2 R! W
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)* i$ y. @! H2 D9 [
- $connection->uid = $data;9 z( R) E _# \1 p! b
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
% [4 t T0 ~. H - * 实现针对特定uid推送数据
. @, _; M0 }1 W" i8 {) ?# } - */: y9 @# h2 `$ m! `# }! B$ ]
- $worker->uidConnections[$connection->uid] = $connection;* v* X# c& x F5 ]
- return;& Z7 g0 e: L: M/ t3 l
- }3 ^# `$ M, W: n5 T: K6 u
- };" Q) y$ u* ^$ v2 N0 g$ x
" x, `' N% P. _' {) N1 d' F6 l# O6 j- // 当有客户端连接断开时% T" J y3 A* ^: w9 G; `
- $worker->onClose = function($connection)7 P. h( {; o8 l/ w
- {& W- v4 N+ J( X# K$ Y
- global $worker;7 s1 L2 K/ a6 Z9 t1 b; _& x
- if(isset($connection->uid))
/ t2 F; z# ]) g( G' y' h# m - {
0 |; V4 G! P" f0 _ - // 连接断开时删除映射
! Z6 j: I5 ~. n7 I$ V4 h - unset($worker->uidConnections[$connection->uid]);
# u) q6 J0 E8 t* b - }1 s! s0 ~ i8 t5 |& x' S
- };
* D. j4 Q* U$ N5 h2 s2 k) Y
% c% F0 {! ?( {& K o* a2 t- // 向所有验证的用户推送数据
0 r& M( q! c) B" {0 B8 g$ { - function broadcast($message)
7 g/ `0 a, W6 Z) F p) g - {4 C8 Y1 q% P: _# ~; a# s
- global $worker;
8 K2 b9 {5 V. q - foreach($worker->uidConnections as $connection)
# D9 k: b, r; w- @* A, A4 o - {+ t9 H3 }$ O, Z5 F" U6 Z
- $connection->send($message);$ q2 i' W4 y1 s {
- }
1 r: I; g1 p( k - }* [% G/ N' [: {% j( f
- 3 o [) ^" C8 _3 R: M% m# Q, B
- // 针对uid推送数据
6 m! P; U; ^" D - function sendMessageByUid($uid, $message)+ z# f% a/ R3 Y, H
- {; @4 d5 s3 o% D3 I& i& E7 y
- global $worker;
' n* c2 Y& q0 f' R - if(isset($worker->uidConnections[$uid]))
/ q9 s0 m# K2 c) {0 V' ?8 U$ @ - {
* {" r N2 Q( |: _3 \1 m - $connection = $worker->uidConnections[$uid];
4 b: z+ D/ B! U4 Y - $connection->send($message);! n% v a& U4 c! i1 U
- return true;
0 t! i1 N4 X. W- f - }
2 {; S+ Y9 X* |4 | - return false;0 ]4 ~8 q% y- t# `( G9 ?
- }
- \( w5 U3 J* g/ }2 x - - l. w; E# o0 G* e7 c f: Y! ~: ^
- // 运行所有的worker
+ {. `, T- g! D! N: U! C8 o - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
! l% \ c$ F9 t* v - ws.onopen = function(){- l" A& C' H7 N1 q
- var uid = 'uid1';6 U3 b6 n0 r% p' f1 F7 e
- ws.send(uid);
0 J0 _" u" j/ u# W4 U J - };
8 n2 { O" g6 ?6 W; ^0 W! x - ws.onmessage = function(e){
6 ?, B1 E R7 ]' @5 m - alert(e.data);
+ i- O1 \; G- y% p8 D, j - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口' x" \" j, ?6 S5 q! v4 U9 c& l
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);- b- ^5 H" I9 f3 J% [
- // 推送的数据,包含uid字段,表示是给这个uid推送
+ Q2 V$ v5 `! h' T8 i! [2 T; e - $data = array('uid'=>'uid1', 'percent'=>'88%'); U D, Q5 `1 ?* a9 a: w
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符" g8 _5 K( E2 E. g+ M. K
- fwrite($client, json_encode($data)."\n");
' l* g8 N I4 N - // 读取推送结果
5 u+ {8 r# |2 }0 |& S( S5 G - echo fread($client, 8192);
复制代码 % e( T( r. ~9 m% R/ H( I2 J
' l* x5 b9 V. ?& i8 f |