- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;; h. B- J9 p p$ D) k2 Z
- require_once __DIR__ . '/Workerman/Autoloader.php';
! e" A: C; M2 c
: _ U1 B4 Y' u1 @+ R- $worker = new Worker();
& W- y3 x+ s- I) O( i2 W, l; w - // 4个进程6 m! q7 L$ X) v/ \
- $worker->count = 4;7 O0 E! Q; \7 x! W+ R4 F' h
- // 每个进程启动后在当前进程新增一个Worker监听( o# @. ]# \# ], j6 P% v
- $worker->onWorkerStart = function($worker); U# \3 \: h6 T/ t" E$ h0 h7 s9 R
- {7 X* ~. J( a+ p
- /**6 K+ Z3 y2 e2 ~) Q/ P
- * 4个进程启动的时候都创建2016端口的Worker$ m: ]& u4 h$ Y
- * 当执行到worker->listen()时会报Address already in use错误. n3 o- M6 G$ Q5 `& q
- * 如果worker->count=1则不会报错
3 @& h1 N, k% H4 W: x. g0 Y6 E9 S1 z - */
c# Y# G& ~" f: Q! x q& ~ - $inner_worker = new Worker('http://0.0.0.0:2016');0 i$ Z$ e* c4 T4 S- k
- $inner_worker->onMessage = 'on_message';+ A/ {' }7 H, |; D
- // 执行监听。这里会报Address already in use错误0 N6 s L$ K) \+ b
- $inner_worker->listen();/ P4 N3 h8 G6 }
- }; ?# }1 { h4 h( e# I0 ~( s8 ]
' L, R" F) \( j, b& b) f; t( A- $worker->onMessage = 'on_message';+ z' P2 N: \" ]& Y% h ~3 C1 ~
; f, }. O5 I" q& P- function on_message($connection, $data)* H+ [ Q; ^" O; C2 C
- {8 W( B, W* a A W5 E/ d9 d$ c8 \1 g
- $connection->send("hello\n");
3 O8 z" G# R9 n ?& ^4 N% z0 e - }- l) Z+ J$ P5 }5 Q. N5 d+ i
% y* v) f: i8 I, h) Q+ z& l- // 运行worker5 L+ r0 s& H( [4 f0 S
- Worker::runAll();2 C, s* {, s3 p& [1 u8 f6 `# F
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
+ q) z% u! C5 I' t - ) V- w7 v& t- z# @4 n0 a) b
- use Workerman\Worker;
2 O6 b8 [; K! q% w H/ k/ W2 {; X: s - require_once './Workerman/Autoloader.php';( U8 z: ~; d. v" l
, u4 y" L, ^% r$ A& b9 Z5 d' g- $worker = new Worker('text://0.0.0.0:2015');# R4 _4 X7 q( _* v1 Y
- // 4个进程, ^% w, a2 q9 a7 _. U! M! n
- $worker->count = 4;
- L/ y- ^& k8 O: m3 ^0 [: p - // 每个进程启动后在当前进程新增一个Worker监听
) i$ y5 d& i* s# K- i - $worker->onWorkerStart = function($worker)
) ~2 x8 g. E1 p; ]/ W - {" d2 |" g" X5 l" q1 G. l2 E# P
- $inner_worker = new Worker('http://0.0.0.0:2016');
. n( c; m8 H! T - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)% y# Q! v9 h/ V! C$ a6 T
- $inner_worker->reusePort = true;
6 ?/ ?% U) E$ {- M* E - $inner_worker->onMessage = 'on_message';
' m' V# L; G: e8 e. q - // 执行监听。正常监听不会报错
7 s# B9 o6 ^) `' z; G - $inner_worker->listen();
; \# F) @: J* d0 m7 k - };
3 ^; p$ {. L4 p6 G - ' c5 P( C9 o7 m& u$ u$ `' \
- $worker->onMessage = 'on_message';
( `) }8 W9 v. [6 |. M# }7 z
& N i2 b# w3 [$ U4 c- ^0 Y; Q- function on_message($connection, $data)
8 M) U( k; o5 H - {' W* e/ c7 @2 b- q( {/ U
- $connection->send("hello\n");% j0 V5 A: m0 E5 z3 D, X; u2 z
- }' s) z8 D& n% E9 H8 O; f
- 6 l2 \0 a0 p3 O3 G1 e
- // 运行worker
4 m0 S2 c" Q/ m$ _# ^ - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
) U4 N$ P- K8 e$ H% {0 ? - use Workerman\Worker;8 b" [# N1 S: A" X
- require_once './Workerman/Autoloader.php';/ d: r0 m; K2 ]5 D! I
- // 初始化一个worker容器,监听1234端口2 u6 A* e0 F1 c( {% O
- $worker = new Worker('websocket://0.0.0.0:1234');) h. {, e% ~! t: _( l& t( r
- ; M" x/ G' T: C' g; \: g' \8 P# q: L
- /*
, J0 t4 l \* O$ i, @" b, i# I6 k - * 注意这里进程数必须设置为1,否则会报端口占用错误
Y& ?) w. |0 T1 | - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
7 p* O' ~; F3 p4 c: a - */
/ F, _) x# ^) [7 n+ X - $worker->count = 1;6 l' e5 m& H B. B. e' m. i. ?& z
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
* C/ Z! a" N% h' L6 C- G7 j7 W - $worker->onWorkerStart = function($worker)
( L, [7 T$ \ a: T: a - {# D7 R3 }" E# S9 O& n
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符4 o8 h, T! z. j; q: @/ H+ J9 u) E
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
6 e$ p7 b% Z! \( P - $inner_text_worker->onMessage = function($connection, $buffer)
& B0 |5 x1 c: X: \7 H2 P5 ]* u - {
) [6 R- w4 R p0 z0 m - // $data数组格式,里面有uid,表示向那个uid的页面推送数据- ]) f: L% G+ P2 v+ `) A
- $data = json_decode($buffer, true);
- U: ^7 l: M% n - $uid = $data['uid'];9 J$ y. `+ G: d7 |. k+ b
- // 通过workerman,向uid的页面推送数据
0 ^7 Y; i3 J! ^* o# G0 f3 n - $ret = sendMessageByUid($uid, $buffer);. Q& C W% h5 Z" j
- // 返回推送结果+ ?# V. u7 y, C4 \2 h. a3 H
- $connection->send($ret ? 'ok' : 'fail');
4 e/ k6 p; O: H! H3 B4 P - };
$ P( Y9 |- |+ w. Y+ q4 d - // ## 执行监听 ##
K# L; I7 n7 u - $inner_text_worker->listen();
, n" i. V6 y7 Y4 R) n, d - };
" o8 J% M. t) ? o - // 新增加一个属性,用来保存uid到connection的映射' p' r6 g4 t/ V
- $worker->uidConnections = array();- `+ l4 ^# ~+ U$ ^7 m
- // 当有客户端发来消息时执行的回调函数- c' D) m% M* D( J* I% _( C; X
- $worker->onMessage = function($connection, $data)
' u f% h, U1 ?4 j/ m - {
0 B1 S, [: {; q6 r- d& q' g - global $worker;
- D2 j) ^* B0 J; Q, V - // 判断当前客户端是否已经验证,既是否设置了uid3 o) \: w0 h2 {
- if(!isset($connection->uid))3 a) [+ B3 u* ?. Y6 q& K# L
- {2 m; z6 X6 y! v5 ]( v
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
! C l9 B& c2 y: c% G - $connection->uid = $data;4 ^5 t8 f1 o8 m7 U( V: z4 g$ C0 M
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,# Y6 Z9 C2 ]0 w7 [
- * 实现针对特定uid推送数据& {6 ^5 \ a( E! a& q
- */9 ]& s( z4 J$ d# l* w
- $worker->uidConnections[$connection->uid] = $connection;
. b: ]( T$ [6 q7 V9 C1 w! S - return;
. M3 w# r8 w9 ?! F3 P - }
' M. J& r5 w3 v6 N - };$ U# L1 ~9 g0 ^
- : W+ J0 P5 Z/ Y# L
- // 当有客户端连接断开时
1 @8 Z- |5 p8 ` F$ y3 y - $worker->onClose = function($connection)
- S8 p3 x7 l2 [$ f) {6 x - {
A' N" ]1 h1 c5 Z - global $worker;
6 z+ a6 F( @. ~& z! Y* [1 W - if(isset($connection->uid)): E$ B& J, n E* H- F( n. r
- {
) u% t! _; n* S& C# M8 c - // 连接断开时删除映射
6 O$ @# Z! p( D; N+ A& X - unset($worker->uidConnections[$connection->uid]);( `' D: M1 S+ L+ w$ K2 L5 N G
- }" p* f6 H' }/ @0 H6 t
- };4 _( f& I$ @% l2 |7 x8 }8 \: n
8 D+ `- Q" U; }$ h, Y$ Q. l- // 向所有验证的用户推送数据$ H. P! C8 v5 d, y
- function broadcast($message): G- C2 b) p* _# h
- {
: o6 y5 L3 ^8 [% \: Q - global $worker;
" N, h( `% I" q% I5 ?5 g - foreach($worker->uidConnections as $connection)9 [" Z5 h! I& F
- {% \. B/ j# K1 x# v# n/ w
- $connection->send($message);
, M3 P1 C. N% g [) B - }
]- \/ f! N2 [4 f9 w - }6 ?' ]" b) ?* a9 Z- P9 X
3 ]. d5 |4 J3 l# c5 m' \' d" z- // 针对uid推送数据
* m" m U% ~' i$ u0 Z; U - function sendMessageByUid($uid, $message)
! b: s& ~( S8 r+ P1 i2 i0 E - {
9 K0 Y# z; L6 b; q - global $worker;
' c4 X+ T: ^* D4 i; P - if(isset($worker->uidConnections[$uid]))
6 h, Z0 I& M ~ { - {
# E `1 A7 S. x$ i( }" t - $connection = $worker->uidConnections[$uid];$ ~, W, k* E' j5 E; U' {
- $connection->send($message);
+ u9 b( j- L" b/ Z$ k - return true;
6 g% ^. [" U! m% U1 P) W9 y - }' g. p3 d& |0 S3 f9 B
- return false;
$ P& l; b$ |6 U6 r* a - }+ V: r* h/ X4 y ? {1 S$ o
- 0 g/ } V4 ]- z* T# z
- // 运行所有的worker1 C- k9 P- F8 ]4 X
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');) I! N6 p( _) Z* a
- ws.onopen = function(){$ C' n. Y$ y. |9 Q! u
- var uid = 'uid1';
& C' R$ ~1 D$ A: U! z2 Z - ws.send(uid);, g: |6 S1 L$ U% K3 g. n
- };
" H4 c m' o4 h - ws.onmessage = function(e){8 I' M* c: N+ Y2 w; V
- alert(e.data);
6 W( J; _9 \4 p1 n! D- l4 T - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
! B' ?8 t" _7 g) F - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);$ L" q- y& Y/ J. Z# Y
- // 推送的数据,包含uid字段,表示是给这个uid推送, u3 P; [, m/ K7 f/ u7 @
- $data = array('uid'=>'uid1', 'percent'=>'88%');
$ q8 d7 G( e' j5 A6 n+ |# `0 @ - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
* J7 N4 X! L( U4 J2 o9 w - fwrite($client, json_encode($data)."\n");: i- s4 ~! N) g1 i0 K V6 ]5 s
- // 读取推送结果
# }; \1 C2 z$ {! m& }8 | - echo fread($client, 8192);
复制代码
- x4 e6 _, v& ?% }" H* i. ?, m7 N w9 n5 r
|