您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14819|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    8 |. `) x% |: I+ e) j! }" c
  2. require_once __DIR__ . '/Workerman/Autoloader.php';) R2 b4 {, Q) a& g# d6 m% F

  3. ) W" z3 T3 g- Y, H/ A6 W
  4. $worker = new Worker();
    0 P) X+ Z6 \. O4 y& [3 D7 \
  5. // 4个进程
    ' I* ~1 S# T, t% O0 U# z, F1 f
  6. $worker->count = 4;
    - d" q0 g5 D: u+ F! r1 d5 y
  7. // 每个进程启动后在当前进程新增一个Worker监听
    / t; i! p# f6 {1 `- k
  8. $worker->onWorkerStart = function($worker). l0 p0 K  r2 k2 ?7 O$ C, T  ?6 P
  9. {
    0 H4 y( _* O; D# l% C1 q
  10.     /**
    7 ?9 f3 \! o0 [$ x% X; |2 C
  11.      * 4个进程启动的时候都创建2016端口的Worker
    8 T7 {+ Y! M4 h- a3 G* P8 o" t
  12.      * 当执行到worker->listen()时会报Address already in use错误+ _0 K; H6 a, o: l+ w2 _' m
  13.      * 如果worker->count=1则不会报错4 f1 U+ [: @3 w* f  J6 q" Y+ Y
  14.      */
    ) n) o% `( P2 ?2 Y2 P
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');" ^* v8 q4 L" G7 A+ `) {( [8 D  x
  16.     $inner_worker->onMessage = 'on_message';( q8 f) [9 {1 i1 c) m
  17.     // 执行监听。这里会报Address already in use错误6 T: E: k7 `- V! z8 r( l1 t
  18.     $inner_worker->listen();
    2 V$ {8 J) ]+ @. t  k' O2 _5 s
  19. };. s& G% r/ K* s
  20. $ D' B) T% Z4 t+ h  C, m- k* I
  21. $worker->onMessage = 'on_message';* V0 f8 y0 g* X% B9 l( h8 u6 R
  22. $ I8 ~- o# M# m) X, f
  23. function on_message($connection, $data)
    - D% L. k! g$ c, T
  24. {8 Q; f( A7 m, O9 a3 P2 R' }
  25.     $connection->send("hello\n");
    2 \! W9 l  \5 M, B
  26. }) e+ G! i* l9 i* |2 k

  27. - U9 Y1 K, C, p% g/ ?8 ~
  28. // 运行worker
    ( K9 L( h' ]& }& s% E: q; {* f
  29. Worker::runAll();7 F5 R$ `3 j3 \% |) T5 d! r
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    ' i/ a6 B+ }) }; f3 E, e
  31. / s$ h! A; {- B! `& `! w9 S
  32. use Workerman\Worker;
    ' m% v& {$ H0 `( s) Q8 k
  33. require_once './Workerman/Autoloader.php';* J2 R" ~0 @& E. y
  34. & v! D" Z. f* b8 C8 P
  35. $worker = new Worker('text://0.0.0.0:2015');
    4 z' Y3 ~4 L  A& x1 ]1 |
  36. // 4个进程- K: g! i; E- r6 X" ^5 f! r: i% Q
  37. $worker->count = 4;
    $ Y5 d- }! H$ U9 ^0 \( q
  38. // 每个进程启动后在当前进程新增一个Worker监听- ^# x9 t' R7 |- r1 `7 q- _
  39. $worker->onWorkerStart = function($worker)
    % s" t9 }+ c5 G1 y- x
  40. {
    ; @+ G3 [7 I; _4 ]
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');9 R1 q! k  `3 q7 Y8 z( \
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    1 b* C4 c% S) }, o9 H8 R2 d
  43.     $inner_worker->reusePort = true;! P9 Z; S' Z9 _4 W
  44.     $inner_worker->onMessage = 'on_message';
    4 I, N  {  j% y" m
  45.     // 执行监听。正常监听不会报错
    : i& @7 D3 B7 C3 V. o
  46.     $inner_worker->listen();8 m  m) X8 Z) r! a3 t2 i& Q
  47. };, P1 ]9 |0 B( m6 O% e6 R1 s/ z

  48.   F1 a- \* ^/ P
  49. $worker->onMessage = 'on_message';
    " p7 u$ j% y" O, s

  50. ' n) g" f' d+ C: M& p
  51. function on_message($connection, $data)( Z* j  ?7 H8 d7 q9 b' r. `% \$ `: {& c
  52. {& @) J# ^% x+ d) W) x
  53.     $connection->send("hello\n");
    5 D  m! ^  a6 }) Z$ M$ a; ^) S
  54. }" h3 V. m2 [- |! \4 {
  55. 2 w4 O, Y8 e# a( g2 o8 {( w( C
  56. // 运行worker
    . k0 B' A! R9 D! _
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php, B7 R3 C, k8 d8 }1 H) [" L9 g; {
  2. use Workerman\Worker;
    ! L) N+ N6 }+ R  O' s2 N
  3. require_once './Workerman/Autoloader.php';
    1 x8 V, |, l1 |4 x( S3 A
  4. // 初始化一个worker容器,监听1234端口1 H7 {# t! b# S/ z9 o. ]' x" @
  5. $worker = new Worker('websocket://0.0.0.0:1234');! X' f6 `8 i3 y+ |3 K8 i# `
  6. 2 K- E# ^' g* Q1 C
  7. /*
    8 j* [  i7 j% N1 e. d; n5 d
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    - V# l, c3 O( M2 m- o6 ~
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true); v* ^2 W) V. i( t0 x2 B
  10. */
    ! ~1 H# n7 K+ ?# B
  11. $worker->count = 1;
    4 w) y/ l( Y  |3 Y* }
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口" O2 ?! B' r# w+ K* ^
  13. $worker->onWorkerStart = function($worker)
    . z8 Y& O6 ?0 c8 x' i' n
  14. {9 m" `# L% t; B4 L9 S
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    " d& ^4 F5 x! d8 L" d. K4 y( @
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');/ o& c! r  g7 m& s" ^6 f0 S
  17.     $inner_text_worker->onMessage = function($connection, $buffer)- W; C  a0 y  R0 K* e: F
  18.     {/ {/ y, K8 c" C9 Y. [
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    * i4 P/ g7 |0 g( Z6 f5 _4 g  d5 ?
  20.         $data = json_decode($buffer, true);6 _0 @; f, n. I6 [
  21.         $uid = $data['uid'];
    ( T" u6 y9 k; \& e" {
  22.         // 通过workerman,向uid的页面推送数据
    ) b' N* q. q- D, g
  23.         $ret = sendMessageByUid($uid, $buffer);5 g3 v( u. L, G
  24.         // 返回推送结果; |3 C6 l1 x" I6 ~" w. [  d* w
  25.         $connection->send($ret ? 'ok' : 'fail');
    / u2 E0 _, T1 A5 ?  w* l. r4 x0 n( F& [$ N
  26.     };  K5 k; n' v- t% f0 g$ }1 I
  27.     // ## 执行监听 ##
    3 ?- k) F7 Z9 ]
  28.     $inner_text_worker->listen();
    % I- w) t" w  m
  29. };
    % n- d# A3 K4 @7 a& Z" X
  30. // 新增加一个属性,用来保存uid到connection的映射
    ! r) P# z0 U+ s: m
  31. $worker->uidConnections = array();
    # t  {% u( }+ W- A% b
  32. // 当有客户端发来消息时执行的回调函数
    ( @. K! P; k8 ?4 M
  33. $worker->onMessage = function($connection, $data)- }' X% G( d6 N+ E! u  `+ M
  34. {
    ! C; C! S& R/ K) c. o
  35.     global $worker;5 X4 c1 L. q1 |9 i; d' i! B3 N' _
  36.     // 判断当前客户端是否已经验证,既是否设置了uid% Z; p- c; W$ |8 B1 H. p/ E3 G
  37.     if(!isset($connection->uid))0 G3 I7 J* x7 }; F4 o: g
  38.     {
    9 d! u: O* N. p/ k, h
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证), [9 Q& G' U+ f  |$ P& N
  40.        $connection->uid = $data;8 c: h4 E, Q2 I5 E) e) c
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    9 r+ S: _1 u7 \4 J' ]3 v8 ]
  42.         * 实现针对特定uid推送数据; M( o$ e2 m% t$ J/ I# c5 A$ `# Q9 ?4 n
  43.         */
    + ]: Y1 a' X. R+ I/ V" I
  44.        $worker->uidConnections[$connection->uid] = $connection;
    ; W  c! j' O: E8 v6 ]2 {8 E9 s
  45.        return;
    2 ]4 d& n' \& r7 S& f2 H5 j
  46.     }- a" @! Q* v0 ?
  47. };
    ( S8 L. Y) b3 N  H% {8 z
  48. 5 z: u- G2 D" X/ U3 l
  49. // 当有客户端连接断开时
    & c/ v, ^/ ~# a( f# F: q. f6 a
  50. $worker->onClose = function($connection): g; D. m7 b2 k, V
  51. {1 T4 A0 v- R8 a  P8 c
  52.     global $worker;
    & Z& d/ F4 F* |1 P; n: [
  53.     if(isset($connection->uid))
    ! g7 Y! k: J/ l7 i  e7 h+ \/ U
  54.     {4 r6 x2 t) P) R% O, c+ x1 ?
  55.         // 连接断开时删除映射- p9 h- @' I- e7 k
  56.         unset($worker->uidConnections[$connection->uid]);
    , V; d' [9 P. B% C0 q- y% S
  57.     }
    4 j$ }' ]/ W" r% `$ F
  58. };" @2 o1 V4 m- E2 o3 W$ u/ J
  59. : v2 ~* Z7 O2 h3 t* A. C
  60. // 向所有验证的用户推送数据1 E; }* I7 ~8 ]% d# h, ^8 j- w
  61. function broadcast($message)
    . P* |/ l+ U* M
  62. {2 U0 N- H7 G+ L% v  T
  63.    global $worker;
    + N; l( K8 J  g  s( s6 g5 m
  64.    foreach($worker->uidConnections as $connection)7 m! u% v. O9 Y8 a
  65.    {
    4 f0 n8 l" U9 C
  66.         $connection->send($message);
    / s2 n3 R& S0 m6 z3 i  v
  67.    }
    * F4 [' P6 _7 l1 V+ S
  68. }
      y2 k/ q1 ]4 J
  69. / |* x. F0 {9 L' e* T, R
  70. // 针对uid推送数据
    5 p/ o. i* k- O/ d0 b
  71. function sendMessageByUid($uid, $message)
    8 p7 l* g9 Z3 r
  72. {
    1 L3 j* o: ~+ Q8 e/ X  D( A1 d' q
  73.     global $worker;
    8 }) y  {( `/ D; t9 w+ P/ u+ H
  74.     if(isset($worker->uidConnections[$uid]))
    2 ]: l% O; Q( ^) F: v' h
  75.     {
    , Z* ^6 _7 u5 ]1 }/ P7 l
  76.         $connection = $worker->uidConnections[$uid];& c; L; }. U% r; K' r
  77.         $connection->send($message);' X$ i; ~, Y, P: l. t) m% D" P* T
  78.         return true;3 L- X# m  O2 M4 o3 f& s
  79.     }' O  p* X& {) r+ ]7 F" R
  80.     return false;4 g7 n) y1 w6 w8 b! w5 \  `7 ~' V* P
  81. }0 }8 x4 I4 q* s$ R3 D
  82. % b1 `( a) K6 n, \
  83. // 运行所有的worker
    5 z6 f1 E& o/ m2 {/ N
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    2 H. [0 o2 l3 ?8 _# n3 Z* z
  2. ws.onopen = function(){+ o  y* i# |2 Y5 S
  3.     var uid = 'uid1';! v; O$ c$ m8 w  _9 @/ X
  4.     ws.send(uid);$ t' i7 b( V% {: Q! t) o9 u
  5. };
    * J$ I! c& L; U8 g
  6. ws.onmessage = function(e){
    ! G1 D0 o  u5 v
  7.     alert(e.data);
    . b) `$ B1 \( e  v% ?
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口2 d* X& x' C  f+ L/ L) Q
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);* h% t4 c) I6 V
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    0 j2 f( h8 {$ u( z% u) E5 q
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');! Q1 o2 P4 j: {$ T) R
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ! z, [# E, A0 E6 n
  6. fwrite($client, json_encode($data)."\n");& V9 X5 x$ b: [% a' T0 L5 @. z9 U% q" W
  7. // 读取推送结果
    - z% b" n# S7 y$ I
  8. echo fread($client, 8192);
复制代码

/ _; T# L0 B$ A8 N7 _) |; O5 u, ~- F5 v: c9 Y
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-17 22:58 , Processed in 0.062248 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!