- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
8 |. `) x% |: I+ e) j! }" c - require_once __DIR__ . '/Workerman/Autoloader.php';) R2 b4 {, Q) a& g# d6 m% F
) W" z3 T3 g- Y, H/ A6 W- $worker = new Worker();
0 P) X+ Z6 \. O4 y& [3 D7 \ - // 4个进程
' I* ~1 S# T, t% O0 U# z, F1 f - $worker->count = 4;
- d" q0 g5 D: u+ F! r1 d5 y - // 每个进程启动后在当前进程新增一个Worker监听
/ t; i! p# f6 {1 `- k - $worker->onWorkerStart = function($worker). l0 p0 K r2 k2 ?7 O$ C, T ?6 P
- {
0 H4 y( _* O; D# l% C1 q - /**
7 ?9 f3 \! o0 [$ x% X; |2 C - * 4个进程启动的时候都创建2016端口的Worker
8 T7 {+ Y! M4 h- a3 G* P8 o" t - * 当执行到worker->listen()时会报Address already in use错误+ _0 K; H6 a, o: l+ w2 _' m
- * 如果worker->count=1则不会报错4 f1 U+ [: @3 w* f J6 q" Y+ Y
- */
) n) o% `( P2 ?2 Y2 P - $inner_worker = new Worker('http://0.0.0.0:2016');" ^* v8 q4 L" G7 A+ `) {( [8 D x
- $inner_worker->onMessage = 'on_message';( q8 f) [9 {1 i1 c) m
- // 执行监听。这里会报Address already in use错误6 T: E: k7 `- V! z8 r( l1 t
- $inner_worker->listen();
2 V$ {8 J) ]+ @. t k' O2 _5 s - };. s& G% r/ K* s
- $ D' B) T% Z4 t+ h C, m- k* I
- $worker->onMessage = 'on_message';* V0 f8 y0 g* X% B9 l( h8 u6 R
- $ I8 ~- o# M# m) X, f
- function on_message($connection, $data)
- D% L. k! g$ c, T - {8 Q; f( A7 m, O9 a3 P2 R' }
- $connection->send("hello\n");
2 \! W9 l \5 M, B - }) e+ G! i* l9 i* |2 k
- U9 Y1 K, C, p% g/ ?8 ~- // 运行worker
( K9 L( h' ]& }& s% E: q; {* f - Worker::runAll();7 F5 R$ `3 j3 \% |) T5 d! r
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
' i/ a6 B+ }) }; f3 E, e - / s$ h! A; {- B! `& `! w9 S
- use Workerman\Worker;
' m% v& {$ H0 `( s) Q8 k - require_once './Workerman/Autoloader.php';* J2 R" ~0 @& E. y
- & v! D" Z. f* b8 C8 P
- $worker = new Worker('text://0.0.0.0:2015');
4 z' Y3 ~4 L A& x1 ]1 | - // 4个进程- K: g! i; E- r6 X" ^5 f! r: i% Q
- $worker->count = 4;
$ Y5 d- }! H$ U9 ^0 \( q - // 每个进程启动后在当前进程新增一个Worker监听- ^# x9 t' R7 |- r1 `7 q- _
- $worker->onWorkerStart = function($worker)
% s" t9 }+ c5 G1 y- x - {
; @+ G3 [7 I; _4 ] - $inner_worker = new Worker('http://0.0.0.0:2016');9 R1 q! k `3 q7 Y8 z( \
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
1 b* C4 c% S) }, o9 H8 R2 d - $inner_worker->reusePort = true;! P9 Z; S' Z9 _4 W
- $inner_worker->onMessage = 'on_message';
4 I, N { j% y" m - // 执行监听。正常监听不会报错
: i& @7 D3 B7 C3 V. o - $inner_worker->listen();8 m m) X8 Z) r! a3 t2 i& Q
- };, P1 ]9 |0 B( m6 O% e6 R1 s/ z
F1 a- \* ^/ P- $worker->onMessage = 'on_message';
" p7 u$ j% y" O, s
' n) g" f' d+ C: M& p- function on_message($connection, $data)( Z* j ?7 H8 d7 q9 b' r. `% \$ `: {& c
- {& @) J# ^% x+ d) W) x
- $connection->send("hello\n");
5 D m! ^ a6 }) Z$ M$ a; ^) S - }" h3 V. m2 [- |! \4 {
- 2 w4 O, Y8 e# a( g2 o8 {( w( C
- // 运行worker
. k0 B' A! R9 D! _ - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php, B7 R3 C, k8 d8 }1 H) [" L9 g; {
- use Workerman\Worker;
! L) N+ N6 }+ R O' s2 N - require_once './Workerman/Autoloader.php';
1 x8 V, |, l1 |4 x( S3 A - // 初始化一个worker容器,监听1234端口1 H7 {# t! b# S/ z9 o. ]' x" @
- $worker = new Worker('websocket://0.0.0.0:1234');! X' f6 `8 i3 y+ |3 K8 i# `
- 2 K- E# ^' g* Q1 C
- /*
8 j* [ i7 j% N1 e. d; n5 d - * 注意这里进程数必须设置为1,否则会报端口占用错误
- V# l, c3 O( M2 m- o6 ~ - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true); v* ^2 W) V. i( t0 x2 B
- */
! ~1 H# n7 K+ ?# B - $worker->count = 1;
4 w) y/ l( Y |3 Y* } - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口" O2 ?! B' r# w+ K* ^
- $worker->onWorkerStart = function($worker)
. z8 Y& O6 ?0 c8 x' i' n - {9 m" `# L% t; B4 L9 S
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
" d& ^4 F5 x! d8 L" d. K4 y( @ - $inner_text_worker = new Worker('text://0.0.0.0:5678');/ o& c! r g7 m& s" ^6 f0 S
- $inner_text_worker->onMessage = function($connection, $buffer)- W; C a0 y R0 K* e: F
- {/ {/ y, K8 c" C9 Y. [
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
* i4 P/ g7 |0 g( Z6 f5 _4 g d5 ? - $data = json_decode($buffer, true);6 _0 @; f, n. I6 [
- $uid = $data['uid'];
( T" u6 y9 k; \& e" { - // 通过workerman,向uid的页面推送数据
) b' N* q. q- D, g - $ret = sendMessageByUid($uid, $buffer);5 g3 v( u. L, G
- // 返回推送结果; |3 C6 l1 x" I6 ~" w. [ d* w
- $connection->send($ret ? 'ok' : 'fail');
/ u2 E0 _, T1 A5 ? w* l. r4 x0 n( F& [$ N - }; K5 k; n' v- t% f0 g$ }1 I
- // ## 执行监听 ##
3 ?- k) F7 Z9 ] - $inner_text_worker->listen();
% I- w) t" w m - };
% n- d# A3 K4 @7 a& Z" X - // 新增加一个属性,用来保存uid到connection的映射
! r) P# z0 U+ s: m - $worker->uidConnections = array();
# t {% u( }+ W- A% b - // 当有客户端发来消息时执行的回调函数
( @. K! P; k8 ?4 M - $worker->onMessage = function($connection, $data)- }' X% G( d6 N+ E! u `+ M
- {
! C; C! S& R/ K) c. o - global $worker;5 X4 c1 L. q1 |9 i; d' i! B3 N' _
- // 判断当前客户端是否已经验证,既是否设置了uid% Z; p- c; W$ |8 B1 H. p/ E3 G
- if(!isset($connection->uid))0 G3 I7 J* x7 }; F4 o: g
- {
9 d! u: O* N. p/ k, h - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证), [9 Q& G' U+ f |$ P& N
- $connection->uid = $data;8 c: h4 E, Q2 I5 E) e) c
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
9 r+ S: _1 u7 \4 J' ]3 v8 ] - * 实现针对特定uid推送数据; M( o$ e2 m% t$ J/ I# c5 A$ `# Q9 ?4 n
- */
+ ]: Y1 a' X. R+ I/ V" I - $worker->uidConnections[$connection->uid] = $connection;
; W c! j' O: E8 v6 ]2 {8 E9 s - return;
2 ]4 d& n' \& r7 S& f2 H5 j - }- a" @! Q* v0 ?
- };
( S8 L. Y) b3 N H% {8 z - 5 z: u- G2 D" X/ U3 l
- // 当有客户端连接断开时
& c/ v, ^/ ~# a( f# F: q. f6 a - $worker->onClose = function($connection): g; D. m7 b2 k, V
- {1 T4 A0 v- R8 a P8 c
- global $worker;
& Z& d/ F4 F* |1 P; n: [ - if(isset($connection->uid))
! g7 Y! k: J/ l7 i e7 h+ \/ U - {4 r6 x2 t) P) R% O, c+ x1 ?
- // 连接断开时删除映射- p9 h- @' I- e7 k
- unset($worker->uidConnections[$connection->uid]);
, V; d' [9 P. B% C0 q- y% S - }
4 j$ }' ]/ W" r% `$ F - };" @2 o1 V4 m- E2 o3 W$ u/ J
- : v2 ~* Z7 O2 h3 t* A. C
- // 向所有验证的用户推送数据1 E; }* I7 ~8 ]% d# h, ^8 j- w
- function broadcast($message)
. P* |/ l+ U* M - {2 U0 N- H7 G+ L% v T
- global $worker;
+ N; l( K8 J g s( s6 g5 m - foreach($worker->uidConnections as $connection)7 m! u% v. O9 Y8 a
- {
4 f0 n8 l" U9 C - $connection->send($message);
/ s2 n3 R& S0 m6 z3 i v - }
* F4 [' P6 _7 l1 V+ S - }
y2 k/ q1 ]4 J - / |* x. F0 {9 L' e* T, R
- // 针对uid推送数据
5 p/ o. i* k- O/ d0 b - function sendMessageByUid($uid, $message)
8 p7 l* g9 Z3 r - {
1 L3 j* o: ~+ Q8 e/ X D( A1 d' q - global $worker;
8 }) y {( `/ D; t9 w+ P/ u+ H - if(isset($worker->uidConnections[$uid]))
2 ]: l% O; Q( ^) F: v' h - {
, Z* ^6 _7 u5 ]1 }/ P7 l - $connection = $worker->uidConnections[$uid];& c; L; }. U% r; K' r
- $connection->send($message);' X$ i; ~, Y, P: l. t) m% D" P* T
- return true;3 L- X# m O2 M4 o3 f& s
- }' O p* X& {) r+ ]7 F" R
- return false;4 g7 n) y1 w6 w8 b! w5 \ `7 ~' V* P
- }0 }8 x4 I4 q* s$ R3 D
- % b1 `( a) K6 n, \
- // 运行所有的worker
5 z6 f1 E& o/ m2 {/ N - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
2 H. [0 o2 l3 ?8 _# n3 Z* z - ws.onopen = function(){+ o y* i# |2 Y5 S
- var uid = 'uid1';! v; O$ c$ m8 w _9 @/ X
- ws.send(uid);$ t' i7 b( V% {: Q! t) o9 u
- };
* J$ I! c& L; U8 g - ws.onmessage = function(e){
! G1 D0 o u5 v - alert(e.data);
. b) `$ B1 \( e v% ? - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口2 d* X& x' C f+ L/ L) Q
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);* h% t4 c) I6 V
- // 推送的数据,包含uid字段,表示是给这个uid推送
0 j2 f( h8 {$ u( z% u) E5 q - $data = array('uid'=>'uid1', 'percent'=>'88%');! Q1 o2 P4 j: {$ T) R
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
! z, [# E, A0 E6 n - fwrite($client, json_encode($data)."\n");& V9 X5 x$ b: [% a' T0 L5 @. z9 U% q" W
- // 读取推送结果
- z% b" n# S7 y$ I - echo fread($client, 8192);
复制代码
/ _; T# L0 B$ A8 N7 _) |; O5 u, ~- F5 v: c9 Y
|