- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
% d' J4 s, P5 |$ T3 ]2 ` ?: | - require_once __DIR__ . '/Workerman/Autoloader.php';( R& P! U' w' @
5 U! Q" l+ O1 K6 m- $worker = new Worker();; s3 F3 s1 W. x/ v- W
- // 4个进程
! j4 m& d9 e$ m6 r# H: V( @ - $worker->count = 4;% i; \' q0 H+ w4 `/ D% a
- // 每个进程启动后在当前进程新增一个Worker监听* M6 l+ p& g8 U- b( ~( [* l5 L
- $worker->onWorkerStart = function($worker)+ p6 ^8 J( x1 L! \4 F
- {
" N' f2 J) n9 S9 R: ^! Y1 E) ^/ Y - /** |2 ~8 l; l/ v5 A1 k( Q$ f
- * 4个进程启动的时候都创建2016端口的Worker3 _# }$ p$ i" a+ [! n/ s
- * 当执行到worker->listen()时会报Address already in use错误
6 L# o8 Y2 h1 G$ U+ B - * 如果worker->count=1则不会报错
`9 f% L& w" C: [ - */; `: G2 o1 j+ P# Q5 Y- x
- $inner_worker = new Worker('http://0.0.0.0:2016');( I5 v, j' d* S+ Q
- $inner_worker->onMessage = 'on_message';
* K b0 r# `4 P8 y' L. X1 E- p$ @ - // 执行监听。这里会报Address already in use错误
+ l. @6 T; u8 ^2 m+ e N - $inner_worker->listen();
. d5 G) X5 |- _" l' o6 Z - };
, D/ z9 a1 u, l
; b0 e+ u$ p$ n( @4 V0 P- $worker->onMessage = 'on_message';* P0 U3 b6 E5 Q6 G* f6 I3 ^/ C
( h; ^! x1 o( A" r' z- function on_message($connection, $data)
) L& D) }, P; m0 | - {; Y: u# g6 G( i' k3 H, b, r: a
- $connection->send("hello\n");* ]: t% i! U1 K1 E o3 B5 n
- }4 Y/ g6 V+ q' Q+ q7 b0 g, d% l1 n
- 2 e1 F+ J6 x4 O S% r
- // 运行worker3 E$ n! [: o0 i6 J% W+ I- z$ j
- Worker::runAll();
# N% O& q+ f2 g - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
6 l, L R7 W4 y/ h - ' _$ i4 ?' }( @9 f' p$ h
- use Workerman\Worker;! D6 Z* s$ D" G9 ^& s8 u
- require_once './Workerman/Autoloader.php';8 g1 T' B* {8 t, T. l8 w" u6 ~
- & B3 \" j: `1 h3 R
- $worker = new Worker('text://0.0.0.0:2015');: M" C& l6 K# ]) E2 c2 V8 ]
- // 4个进程
6 A( y8 f# B% o0 L+ o3 z - $worker->count = 4;; [3 B* h3 @! J% g
- // 每个进程启动后在当前进程新增一个Worker监听
9 [+ }4 c7 Z$ Q - $worker->onWorkerStart = function($worker)
. P) s0 E. z% C* h - {* O2 ?; @" L q# `
- $inner_worker = new Worker('http://0.0.0.0:2016');
2 s7 A _8 W: Q- [8 x' {, k, ] - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
; p6 R0 D" E* n - $inner_worker->reusePort = true;
! j: u/ o+ G& x% R7 s - $inner_worker->onMessage = 'on_message';
F8 B4 d& b8 a9 y j( _. `* Y - // 执行监听。正常监听不会报错
A8 z5 e0 q( t: X - $inner_worker->listen();
. N" [4 U5 E5 Y" M; J - };2 w) y5 ~) {3 }. y& K
- # _' T# e7 d$ ] g
- $worker->onMessage = 'on_message';9 c+ U9 z5 M {1 [( E8 W
- 1 h6 E3 Q2 {: w4 r; E5 P% j7 X9 Z
- function on_message($connection, $data)5 I# t" {. J4 H' V& f3 q( M. k
- {
6 q" k" Q/ V2 B2 ?& F - $connection->send("hello\n");' g9 p" b, i% P% x! n
- }
/ s; H8 V5 e4 V6 f. u - 4 M. w' t& a8 u3 p
- // 运行worker
- p, J6 A& e- | - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
( m, H: U0 |" W) e, V- o - use Workerman\Worker;# V( P$ S4 G7 Y, m6 k
- require_once './Workerman/Autoloader.php';
# P$ \/ q7 {9 B - // 初始化一个worker容器,监听1234端口
# T' W0 u+ j9 l1 Y - $worker = new Worker('websocket://0.0.0.0:1234');
- r! Z( ~: L2 t9 W
. D, x0 C s/ c6 @: T9 U- /*2 ` ]9 z- E2 Y# e) p! l6 ~
- * 注意这里进程数必须设置为1,否则会报端口占用错误
* {, j; Y! y0 }+ U1 m. s - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)% s4 z b1 m4 Y$ W
- */
, r. o2 c0 @1 s# X - $worker->count = 1;5 [( m2 S/ }0 W, u2 g! k" X# V# S
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口8 B, ^) M/ x/ P" m, r/ F
- $worker->onWorkerStart = function($worker)
( A1 B# J. k# D0 L( ~$ t) t; C - {& R& l) {2 J. \) J
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符5 i/ [ q! |% l
- $inner_text_worker = new Worker('text://0.0.0.0:5678');! p1 N5 ~& [8 K/ O
- $inner_text_worker->onMessage = function($connection, $buffer)) T5 N( F3 |; n& D9 k0 A& q
- {
9 U! }( [) K0 f3 e# i4 U3 `- B - // $data数组格式,里面有uid,表示向那个uid的页面推送数据/ u0 I- \( {$ d
- $data = json_decode($buffer, true);# q9 ~( D$ [: v$ }7 x# x- ?5 {; v
- $uid = $data['uid'];
' `( V+ [) {5 N - // 通过workerman,向uid的页面推送数据
! _2 j/ V" T9 K, ? - $ret = sendMessageByUid($uid, $buffer);- H7 Y0 }% V& E, G+ x
- // 返回推送结果) h0 r" G* q6 o6 K2 H/ E2 [
- $connection->send($ret ? 'ok' : 'fail');# B5 H I" R6 \. p
- };
5 U0 a! t6 O5 \3 U* o8 D8 t - // ## 执行监听 ##' _# X8 A7 O5 R2 o( D6 D
- $inner_text_worker->listen();
: V* W: N4 C7 U: y2 ~# W - };
" K: ^( C( x8 J& G! |! p) ^, H - // 新增加一个属性,用来保存uid到connection的映射
+ i; c' ]9 E% p3 F! q- H2 S$ u8 } - $worker->uidConnections = array();
) S# o9 L7 F( R - // 当有客户端发来消息时执行的回调函数
1 V/ M" ^5 O: j2 L9 { - $worker->onMessage = function($connection, $data)
1 h- Y; X0 Q" O - {
) d- k- J: T! t( }4 e - global $worker;
2 E! `. n0 u2 B, E+ ?. j" @ - // 判断当前客户端是否已经验证,既是否设置了uid
: Y x3 z. \0 v: @ e6 s* s - if(!isset($connection->uid))
, a9 W7 b/ w( p- q# \ - {
; c k! a6 L( U0 K8 n% N, X - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)9 I5 ~" \4 Q# U( G( \3 a
- $connection->uid = $data;
* m1 [1 v$ \) z, m% M( X: C - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
, K' E; M4 c; _' T, T | - * 实现针对特定uid推送数据
* T" H k3 u8 N1 D3 @* f - */
- v3 _0 n/ C: r' o5 I5 m, Z _$ r - $worker->uidConnections[$connection->uid] = $connection;/ a, a }; m; r' I4 h+ M! b( u
- return;' t7 l. ]' j1 d+ ^$ o: W
- } C" [: d9 ^0 F' {0 P
- };
' G# s. \2 A4 u$ Z - - V3 i, }( N5 W, G1 q8 \* V) I/ h3 e( D
- // 当有客户端连接断开时8 @8 C* d1 I0 D" m7 c2 I
- $worker->onClose = function($connection)0 m1 }+ l* ` P9 m* B* m& k+ l
- {
: e2 d; n/ c; K& N c - global $worker;; Q4 X3 m0 Z# s( q# U5 f& N8 W7 j
- if(isset($connection->uid))
3 G+ m- a6 D$ y% u1 A6 V, h* _ - {' p$ m5 G: A+ u0 u" l
- // 连接断开时删除映射* @% W- q' Y+ I9 |1 t8 Z
- unset($worker->uidConnections[$connection->uid]);
' s7 S8 l! [# z# B4 _& B - }
, r) g) \# l4 B- u* W - };
, E# o- x' [4 J
+ D9 C6 j: O' s! f- // 向所有验证的用户推送数据( Q. ^9 Z7 V, S: |) C
- function broadcast($message)
* E. m% I# t' r6 T' \ - {9 Y4 y3 W8 p F( [0 \1 ?6 Y
- global $worker;. L! ~' ?/ z+ T* r5 M, W! A
- foreach($worker->uidConnections as $connection)
" M% r, j' I1 a - {$ A. j' r# Y7 W$ f0 s+ o
- $connection->send($message);9 Z5 \% D- ?2 `
- } c; m# Y4 Q# u4 f5 v8 d4 y; b3 p
- }
$ v! R& H& X9 _/ V- }: V - # b& A' U" }1 ^% M: u% W$ V
- // 针对uid推送数据
" e# ?( e+ f& B# ~3 K - function sendMessageByUid($uid, $message): o) C ^3 D" g: `3 n
- {$ U- Q/ ^# M' \$ w! O$ u1 X
- global $worker;3 X0 z6 m/ N( E. c
- if(isset($worker->uidConnections[$uid]))$ z s- _, c7 q7 C, t. e w+ |+ k
- {
+ N8 s, z* _7 j* Y/ b' S$ f - $connection = $worker->uidConnections[$uid];& B6 R8 x7 R0 K! k5 }1 ^
- $connection->send($message);
% k+ C0 x7 Y; j) H) {3 K - return true;! M5 p9 r: l" l. e
- }) x2 ?9 n5 |# q, u6 j4 R) X
- return false;
4 c. f e {, a; Q5 v - }
5 U# T5 x, o: ?4 |1 A
0 `# t7 s4 I% y ]$ Q* |- // 运行所有的worker' O- u6 o N2 b* G0 x9 m
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');# k5 x9 e1 I$ c; G4 g
- ws.onopen = function(){6 b' F2 o, i7 e" v" H
- var uid = 'uid1';
: D* Z( A5 G- t3 N, X - ws.send(uid);
* [/ Q$ {) A) \ Z - };) o0 ~9 u; ~% n2 X
- ws.onmessage = function(e){
& w& S' x3 f8 E# Q5 b1 R - alert(e.data);5 F3 t7 v4 s: o5 S4 j/ {, o/ e9 O! N
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口: K# \2 a' h3 n7 M: ~
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);5 Q/ S. z* L6 ^! `3 l
- // 推送的数据,包含uid字段,表示是给这个uid推送4 Y* C+ V# g& S9 ~4 ?' \7 H8 L
- $data = array('uid'=>'uid1', 'percent'=>'88%');; i+ f1 g# P8 q. b- u& ~/ ?0 s
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符# F5 h, I; D- T1 |, ~/ [
- fwrite($client, json_encode($data)."\n");
6 Y+ H8 ?* Q+ i# D- k: w) e - // 读取推送结果6 n0 l* R# c8 g
- echo fread($client, 8192);
复制代码 4 {: w- X% q; P% i
; l R) w6 c1 R7 e: H8 H
|