- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;# n/ C7 s( V) r, D
- require_once __DIR__ . '/Workerman/Autoloader.php';/ \1 T4 A# f# d, {
/ C7 e" e6 v- @4 K# ]( L, M6 o) N- $worker = new Worker();
; ~5 P. v {* m5 R9 P - // 4个进程
/ s" I. i* q$ L R - $worker->count = 4;
0 w6 Q9 n, z4 q2 {9 g7 ^. U' C - // 每个进程启动后在当前进程新增一个Worker监听 t6 J/ d7 l: j
- $worker->onWorkerStart = function($worker)
* H) w' L* Q% ^& t& G D% X' s3 | - {
& v0 A: W4 |- Q5 r* ^ - /**3 n$ ~. j& \4 V2 {, Z3 ~9 R/ t
- * 4个进程启动的时候都创建2016端口的Worker
8 P5 |: y0 w* T" j5 X! e - * 当执行到worker->listen()时会报Address already in use错误8 t4 t. w9 {' L5 y. m1 k# m5 P
- * 如果worker->count=1则不会报错* D+ p3 E4 Q5 H: [8 u
- */( d# v1 k3 d& F
- $inner_worker = new Worker('http://0.0.0.0:2016');8 v5 F- U2 J7 q
- $inner_worker->onMessage = 'on_message';
6 R! ?6 Z6 e S% X& V - // 执行监听。这里会报Address already in use错误
* |9 t1 a# y" D" {: g: r3 y - $inner_worker->listen();, @* _* P8 _; W) a
- };" l9 s' E8 \4 j7 J
- ) F0 D3 W3 o1 L% K
- $worker->onMessage = 'on_message';
# U9 O$ t1 F( P* @ - 8 L* m6 |. B5 w3 p7 U) U$ L
- function on_message($connection, $data)0 ~/ @& k3 G; {$ W
- {
- O! n2 F; m, S1 f5 q - $connection->send("hello\n");' j- z l) K5 V
- }
5 u. O$ Z: O$ h) a
$ h: I& E8 y/ z- // 运行worker
+ i' r: K& Z2 F, [ - Worker::runAll();% `$ K0 K( @' W6 v/ y
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:' f! x8 n8 `# \
Q9 Y. k$ r% `. |; t- O8 o1 b- use Workerman\Worker;5 a* i& Y: R! m5 ]& W+ a. p* |
- require_once './Workerman/Autoloader.php';
+ i) a0 c! T3 y# B
) c! n" V2 k' F! ]: w" Q X- $worker = new Worker('text://0.0.0.0:2015');
9 c4 q2 N* u$ M8 r$ l8 N7 \0 f - // 4个进程/ O" Z$ s; z5 G }/ y2 v
- $worker->count = 4;# ~1 x4 D3 ~0 s' E
- // 每个进程启动后在当前进程新增一个Worker监听
2 f: M# `5 Q% U - $worker->onWorkerStart = function($worker)
& k, C9 P8 m9 h# E( O! L( v7 H4 Z - {& e& h9 o0 I! V1 |9 ^
- $inner_worker = new Worker('http://0.0.0.0:2016');! u* J. {6 e Q, u3 @+ ?0 X# Y
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)# j: t- {" p: W1 T; s# @" L" m
- $inner_worker->reusePort = true; l- F: K9 t8 W/ i2 J; M
- $inner_worker->onMessage = 'on_message';
1 S5 W2 l% W1 E - // 执行监听。正常监听不会报错
/ a5 g9 {0 T& N y! Q8 Y/ E! | - $inner_worker->listen();
4 t; P2 s B" t6 U* J2 s - };
) i+ g+ C& n$ j- {! K - 4 b' D( {, _: ~6 o0 X' o6 {
- $worker->onMessage = 'on_message';
$ ?5 d) b4 @. U' V; c E' C
$ D+ m5 S, h1 Q2 r. @0 C. _" F$ k- function on_message($connection, $data)7 m+ {! k ~- L
- {& J% @: B! ^7 G4 [' {: N1 j' U& |, k
- $connection->send("hello\n");3 W: l' l6 |5 D- ?
- }
( x- Q' d% M4 A$ n - 2 R7 q* ]( {4 g: r: I5 \
- // 运行worker2 z. n& n" y g% `- f: A
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
: P! r. `- u2 O* I- Z8 l9 A$ b8 h - use Workerman\Worker;
: \& T' r9 v/ E; q5 U - require_once './Workerman/Autoloader.php';
+ p+ v% j6 [ n4 E - // 初始化一个worker容器,监听1234端口7 ]: O3 D" E! V4 `9 w
- $worker = new Worker('websocket://0.0.0.0:1234');
! D- ~2 i; S+ ]" H
6 K5 j, |# ^% g& o( `- /*
6 a+ \! h/ k$ [/ P u: S5 a - * 注意这里进程数必须设置为1,否则会报端口占用错误
" S. h6 J; L6 O6 s1 M0 q - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)! X! j, a/ b) Y m9 @0 {
- */9 C ~# _+ M- F7 [5 O7 `8 r
- $worker->count = 1;
5 e9 _+ o1 ~# z$ ? - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口# |+ C) {% v# p2 Z
- $worker->onWorkerStart = function($worker)1 E1 x. m" b; e" }; i
- {7 }4 r9 M/ p% w9 m# V u6 y
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
4 {8 z* }( ~+ `+ d3 D+ P. R& B - $inner_text_worker = new Worker('text://0.0.0.0:5678');
, `: T7 T! _. L9 P6 C - $inner_text_worker->onMessage = function($connection, $buffer)6 R+ l( D+ t% Z! |8 C7 R
- {, u% v7 Q. `8 X! C0 T
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
: U; f, ^2 [9 L3 \2 c, M - $data = json_decode($buffer, true);" h/ w# `! O Z% }+ W
- $uid = $data['uid'];
( C4 A) p+ h$ i/ S* Q/ d - // 通过workerman,向uid的页面推送数据
; C3 X7 j( q; _& n0 o8 c - $ret = sendMessageByUid($uid, $buffer);
+ j' ^* r, |) k Q( [0 t7 x - // 返回推送结果
* F0 V8 r( L( c; g6 z& p - $connection->send($ret ? 'ok' : 'fail');: w. P* L p# \3 C$ W O
- };/ X8 u/ s$ Z; L! E
- // ## 执行监听 ##5 t! Y& \& T- ], \
- $inner_text_worker->listen();
* z/ I2 h$ P/ z( b- C* ` - };
y/ U; }1 I" Y$ N& R7 G - // 新增加一个属性,用来保存uid到connection的映射
7 C ?4 g' x! S+ g5 m1 s - $worker->uidConnections = array();
. P& F. j8 [* F- C7 b& H# L - // 当有客户端发来消息时执行的回调函数
# @. e1 y: X+ C7 X" z& \% o' C - $worker->onMessage = function($connection, $data)
$ h! P* v5 Z4 y, y( [6 ~8 ~5 }' \0 H - {
& l/ i8 V9 d5 k - global $worker;" c: J% Q% ?( D3 n+ S0 X- o* q
- // 判断当前客户端是否已经验证,既是否设置了uid
2 i$ y- G* B" Y" Z. e* a, [ - if(!isset($connection->uid))6 C, |9 r+ X0 Z- z; N
- {
/ N* X5 T2 e0 A# }6 c - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)9 U% K$ x+ b V+ r+ K
- $connection->uid = $data;
! p; U( o% L$ m$ ~% L - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,; [) E7 O) H: B7 w+ _6 q" n
- * 实现针对特定uid推送数据
1 P$ z/ [% {6 c4 ], ]0 p3 S4 H - */
( B) t+ K( t; \ - $worker->uidConnections[$connection->uid] = $connection;
, q3 C2 K* R- ~7 `; j; {! x. k - return;
' K+ c0 P. F2 o$ i2 x3 e - }& N% p; s( y/ ^/ }6 C% V3 b. X
- };# a. k& K" C5 ~6 t" t8 Z' _
- 7 ~9 b1 n; M' g. Z" t, G" G3 y0 X
- // 当有客户端连接断开时
1 B2 z: f3 G! c( i$ {# n - $worker->onClose = function($connection)& q7 ~( P2 l; }/ _) \" p6 q
- {
& V* H. Y9 W! A% O; j - global $worker;: c& I8 m% | L! F, R0 u+ i) H
- if(isset($connection->uid))3 b& v& g" }8 Y
- {
& E( w- N7 v; k9 h - // 连接断开时删除映射4 y9 F* A; y4 l7 w% @# N
- unset($worker->uidConnections[$connection->uid]);
5 P- }2 j& R- X+ L' g - }' K6 J; k) Z0 x5 b0 z- k
- };
# r9 Z" F% r4 G% ~7 P" V
8 m3 Y& y4 E- n! q( r- // 向所有验证的用户推送数据/ P& ]3 P, Q6 E# }
- function broadcast($message): e' f' A: g2 K/ V; H# b
- {! \! J! X5 C8 ]% ~
- global $worker;# A& O1 B: F0 v- Q1 u
- foreach($worker->uidConnections as $connection): T5 f2 G+ n9 E; E
- {
9 P( h( x: }/ E& _/ ` - $connection->send($message); _$ m, I6 S, W9 ?4 a1 \ { b
- }
& y; F7 [) J% ` s2 n - }
F- [7 y& J6 |2 j& h - 6 m4 ]6 U( p/ ~7 ~
- // 针对uid推送数据1 [+ Q* z# ^& |3 a* p, R
- function sendMessageByUid($uid, $message): j) f; W- t8 L( {9 P$ i
- {" ~2 \. m& k( {% G) b! z- f
- global $worker;
9 Z. p( I, C8 O! y' ` - if(isset($worker->uidConnections[$uid]))
8 u. c1 A+ O2 R8 ?4 n# i - {
) d4 s& h5 u& m# m - $connection = $worker->uidConnections[$uid];
9 c* l6 A- [* P$ L+ N - $connection->send($message);
( s0 P- n p9 Q - return true;
3 O, e7 i# n- v4 J* `; ] - }
0 e* a6 j' T" Q - return false;- m7 d& G$ |/ \1 i0 d- B) o9 B4 \
- }
. Y5 B# q0 p' q ?# H; v - 5 E1 ]9 a# P$ X0 M
- // 运行所有的worker4 d: g' d' H7 _3 R
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
! s# G ^0 n; ` - ws.onopen = function(){& A# w$ @! y6 x! s% x, Z
- var uid = 'uid1';
- o( E- J/ |. s# P& W - ws.send(uid);7 T2 k+ q( t+ Q! w4 b2 k0 ~' R
- };
6 O, F+ w3 ~; s! K% w9 D - ws.onmessage = function(e){# x5 e! H, L! l5 T
- alert(e.data);0 ^: m7 @+ X- _& N
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
0 W+ b0 V$ b. v q - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
2 [* x, [1 m9 ]: E9 q/ {( K' Y2 N! A - // 推送的数据,包含uid字段,表示是给这个uid推送, U$ `8 m# K/ N" r% \5 Q
- $data = array('uid'=>'uid1', 'percent'=>'88%');0 J, q5 Y- P* _4 q/ F0 t
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符( G$ M1 O& X& u% Z8 h
- fwrite($client, json_encode($data)."\n");% X" @6 g+ Q( x- A3 h& p
- // 读取推送结果
8 l; z+ W! n4 t2 \8 x/ z - echo fread($client, 8192);
复制代码 9 g/ A$ @9 u. s1 e/ X" L9 i
) j/ Z4 T3 ?* I7 E5 L+ _ |